def translate_metrics_to_records()

in lambda/metric_stream_producer.py [0:0]


def translate_metrics_to_records(metrics_data: List[dict], time: datetime, event: dict, context: dict, metric_sets):
    """Translate CW metrics list to Kinesis stream records."""
    records = []
    metadata_map = {}
    dimensions_map = {}

    for metric_object in metrics_data:
        for metric in metric_sets:
            if metric.unique_id() == metric_object['Id']:
                metric_object['Namespace'] = metric.namespace
                metric_object['Name'] = metric.name
                metric_object['Period'] = metric.period
                metric_object['Statistic'] = metric.statistic
                if metric.metadata:
                    for meta in metric.metadata:
                        metadata_map[meta.name] = meta.value
                    metric_object['Metadata'] = metadata_map
                if metric.dimensions:
                    for dimension in metric.dimensions:
                        dimensions_map[dimension.name] = dimension.value
                    metric_object['Dimensions'] = dimensions_map
            else:
                continue

        metric_object['CollectionTime'] = time.replace(tzinfo=timezone.utc).isoformat()
        metric_object['AccountId'] = context.invoked_function_arn.split(":")[4]
        metric_object['Region'] = context.invoked_function_arn.split(":")[3]
        metric_object['MetricTimestamp'] = metric_object['Timestamps'][0] if len(metric_object['Timestamps']) > 0 else None
        metric_object['MetricValue'] = metric_object['Values'][0] if len(metric_object['Values']) > 0 else None
        metric_object['Frequency'] = event['frequency']
        records.append({
            'Data': json.dumps(metric_object, default=str),
            'PartitionKey': 'default'
        })
    print(records)
    return records