in lambda/metric_stream_producer.py [0:0]
def translate_metrics_to_records(metrics_data: List[dict], time: datetime, event: dict, context: dict, metric_sets):
"""Translate CW metrics list to Kinesis stream records."""
records = []
metadata_map = {}
dimensions_map = {}
for metric_object in metrics_data:
for metric in metric_sets:
if metric.unique_id() == metric_object['Id']:
metric_object['Namespace'] = metric.namespace
metric_object['Name'] = metric.name
metric_object['Period'] = metric.period
metric_object['Statistic'] = metric.statistic
if metric.metadata:
for meta in metric.metadata:
metadata_map[meta.name] = meta.value
metric_object['Metadata'] = metadata_map
if metric.dimensions:
for dimension in metric.dimensions:
dimensions_map[dimension.name] = dimension.value
metric_object['Dimensions'] = dimensions_map
else:
continue
metric_object['CollectionTime'] = time.replace(tzinfo=timezone.utc).isoformat()
metric_object['AccountId'] = context.invoked_function_arn.split(":")[4]
metric_object['Region'] = context.invoked_function_arn.split(":")[3]
metric_object['MetricTimestamp'] = metric_object['Timestamps'][0] if len(metric_object['Timestamps']) > 0 else None
metric_object['MetricValue'] = metric_object['Values'][0] if len(metric_object['Values']) > 0 else None
metric_object['Frequency'] = event['frequency']
records.append({
'Data': json.dumps(metric_object, default=str),
'PartitionKey': 'default'
})
print(records)
return records