in functions/simple/simple.py [0:0]
def check_errors_in_cloudwatch(func_name, alias_name, new_version):
client = boto3.client('cloudwatch')
func_plus_alias = func_name + ":" + alias_name
now = datetime.utcnow()
start_time = now - timedelta(minutes=1)
response = client.get_metric_statistics(
Namespace='AWS/Lambda',
MetricName='Errors',
Dimensions=[
{
'Name': 'FunctionName',
'Value': func_name
},
{
'Name': 'Resource',
'Value': func_plus_alias
},
{
'Name': 'ExecutedVersion',
'Value': new_version
}
],
StartTime=start_time,
EndTime=now,
Period=60,
Statistics=['Sum']
)
datapoints = response['Datapoints']
for datapoint in datapoints:
if datapoint['Sum'] > 0:
log.info("Failing health check because error metrics were found for new version: {0}".format(datapoints))
return False
return True