def put_metric_data()

in liminal/runners/airflow/operators/job_status_operator.py [0:0]


    def put_metric_data(self, metric):
        value = metric.value

        cloudwatch = self.get_conn()

        dimensions = [{'Name': tag.name, 'Value': tag.value} for tag in metric.tags]

        cloudwatch.put_metric_data(
            Namespace=metric.namespace,
            MetricData=[
                {
                    'MetricName': metric.name,
                    'Dimensions': dimensions,
                    'Timestamp': datetime.utcnow(),
                    'Value': value,
                    'Unit': 'None',
                }
            ],
        )

        self.log.info(f'Published metric: {metric.name} with value: {value}')