in lambda/sla_stream_producer.py [0:0]
def translate_clas_to_records(slas_data: List[dict], time: datetime, event: dict, context: dict, metric_sets):
"""Translate CW sla list to Kinesis stream records."""
records = []
metadata_map = {}
for sla_object in slas_data:
for metric in metric_sets:
resolved_alarm_id_from_cloudwatch = ("-".join(sla_object['AlarmName'].split('-')[3:-5]))
resolved_alarm_id_from_metric_object = str(metric.alarm_unique_id())[:-1]
if resolved_alarm_id_from_metric_object == resolved_alarm_id_from_cloudwatch:
sla_object['AccountId'] = context.invoked_function_arn.split(":")[4]
sla_object['Region'] = context.invoked_function_arn.split(":")[3]
sla_object['MetricNamespace'] = sla_object['Namespace']
sla_object['MetricPeriod'] = sla_object['Period']
sla_object['MetricStatistic'] = sla_object['Statistic']
sla_object['CollectionTime'] = time.replace(tzinfo=timezone.utc).isoformat()
if metric.metadata:
for meta in metric.metadata:
metadata_map[meta.name] = meta.value
sla_object['Metadata'] = metadata_map
else:
continue
records.append({
'Data': json.dumps(sla_object, default=str),
'PartitionKey': 'default'
})
print(records)
return records