in liminal/runners/airflow/operators/job_status_operator.py [0:0]
def put_metric_data(self, metric):
value = metric.value
cloudwatch = self.get_conn()
dimensions = [{'Name': tag.name, 'Value': tag.value} for tag in metric.tags]
cloudwatch.put_metric_data(
Namespace=metric.namespace,
MetricData=[
{
'MetricName': metric.name,
'Dimensions': dimensions,
'Timestamp': datetime.utcnow(),
'Value': value,
'Unit': 'None',
}
],
)
self.log.info(f'Published metric: {metric.name} with value: {value}')