in blogs/eks-canary-deployments-pipeline/shared_stack/lambda_functions/gather_healthcheck_status/main.py [0:0]
def get_healthcheck_status(microservice_name, failure_threshold_value, failure_threshold_time):
now = datetime.datetime.now()
ts_now = now.timestamp()
ts_start = (now - datetime.timedelta(seconds=failure_threshold_time)).timestamp()
response = cw.get_metric_data(
MetricDataQueries=[
{
'Id': 'id1',
'MetricStat': {
'Metric': {
'Namespace': 'EnvoyPrometheus/ResponseCode',
'MetricName': '5xx-%s' % microservice_name
},
'Period': 60,
'Stat': 'Sum'
},
'ReturnData': True
},
],
StartTime=ts_start,
EndTime=ts_now,
ScanBy='TimestampDescending'
)
values = response.get('MetricDataResults')[0].get('Values')
if values and sum(values) > failure_threshold_value:
print('Found [%s] 5XX HTTP response code' % sum(values))
return False
return True