def get_healthcheck_status()

in blogs/eks-canary-deployments-pipeline/shared_stack/lambda_functions/gather_healthcheck_status/main.py [0:0]


def get_healthcheck_status(microservice_name, failure_threshold_value, failure_threshold_time):
    now = datetime.datetime.now()
    ts_now = now.timestamp()
    ts_start = (now - datetime.timedelta(seconds=failure_threshold_time)).timestamp()

    response = cw.get_metric_data(
        MetricDataQueries=[
            {
                'Id': 'id1',
                'MetricStat': {
                    'Metric': {
                        'Namespace': 'EnvoyPrometheus/ResponseCode',
                        'MetricName': '5xx-%s' % microservice_name
                    },
                    'Period': 60,
                    'Stat': 'Sum'
                },
                'ReturnData': True
            },
        ],
        StartTime=ts_start,
        EndTime=ts_now,
        ScanBy='TimestampDescending'
    )

    values = response.get('MetricDataResults')[0].get('Values')
    if values and sum(values) > failure_threshold_value:
        print('Found [%s] 5XX HTTP response code' % sum(values))
        return False
    return True