src/worker/publisher/metrics_publisher.py (180 lines of code) (raw):
import logging
import json
import urllib
import shlex
import socket
import subprocess
import sys
import time
from prometheus_client.parser import text_string_to_metric_families
from urllib.error import URLError
publisher_agent = sys.argv[1]
logger = logging.getLogger(__name__)
if publisher_agent == 'geneva':
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, AggregationTemporality
from opentelemetry.sdk.resources import Resource
elif publisher_agent == 'azure_monitor':
from azure.monitor.opentelemetry import configure_azure_monitor
else:
raise Exception('##[ERROR]Invalid publisher agent')
from opentelemetry import metrics
from opentelemetry.sdk.metrics import Counter, Histogram
def get_geneva_exporter_meter(metrics_auth, metrics_namespace):
"""
Returns a meter that uses the OTLP exporter to send metrics to the collector
Args:
metrics_auth (str): The metrics auth
metrics_namespace (str): The metrics namespace
"""
resource = Resource(attributes={
"microsoft_metrics_account": metrics_auth,
"microsoft_metrics_namespace": metrics_namespace
})
temporality_delta = {Counter: AggregationTemporality.DELTA, Histogram: AggregationTemporality.DELTA}
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(
endpoint="0.0.0.0:4317",
insecure=True,
preferred_temporality=temporality_delta)
)
provider = MeterProvider(resource=resource, metric_readers=[reader])
metrics.set_meter_provider(provider)
meter = metrics.get_meter(__name__)
return meter
def get_azure_monitor_exporter_meter(metrics_auth, metrics_namespace):
"""
Returns a meter that uses the Azure Monitor exporter to send metrics to the collector
Args:
metrics_auth (str): The metrics auth
metrics_namespace (str): The metrics namespace
"""
configure_azure_monitor(connection_string=metrics_auth)
meter = metrics.get_meter_provider().get_meter(metrics_namespace)
return meter
def shell_cmd(cmd, timeout):
"""
Helper Function for running subprocess
Args:
cmd (str): The command to run
timeout (int): The timeout for the command
Returns:
result (str): The result of the command
"""
args = shlex.split(cmd)
child = subprocess.Popen(args, start_new_session=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
try:
result, _ = child.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
child.kill()
print("##[ERROR]Command " + " ".join(args) + ", Failed on timeout")
result = 'TimeOut'
return result
return result.decode()
def get_publisher_metrics_config():
"""
Get the geneva metrics config
Returns:
config(dict): The geneva metrics configuration
"""
with open('/tmp/moneo-worker/moneo_config.json') as f:
config = json.load(f)
return config
def get_vm_id():
"""
Get the vm id
Returns:
vm_id(str): The vm id
"""
cmd = 'curl -s -H Metadata:true \
"http://169.254.169.254/metadata/instance/compute/vmId?api-version=2021-02-01&format=text"'
vm_id = shell_cmd(cmd, 15).splitlines()[0]
return vm_id
def get_scaleset_name():
"""
Get the scaleset name
Returns:
scaleset_name(str): The scaleset name
"""
cmd = 'curl -s -H Metadata:true \
"http://169.254.169.254/metadata/instance/compute/name?api-version=2021-02-01&format=text"'
scaleset_name = shell_cmd(cmd, 15).splitlines()[0]
return scaleset_name.split("_")[0]
def get_subcription_id():
"""
Get the subscription id
Returns:
subscription_id(str): The subscription id
"""
cmd = 'curl -s -H Metadata:true \
"http://169.254.169.254/metadata/instance/compute/subscriptionId?api-version=2021-02-01&format=text"'
subscription_id = shell_cmd(cmd, 15).splitlines()[0]
return subscription_id
class MetricsPublisher():
""" MetricsPublisher is a class that using optl_exporter publishes metrics to Geneva"""
def __init__(self, metrics_ports=None, metrics_auth=None, metrics_namespace=None):
self.metrics_ports = metrics_ports
self.metrics_auth = metrics_auth
self.metrics_namespace = metrics_namespace
self.node_name = socket.gethostname()
self.vm_id = get_vm_id()
self.scaleset_name = get_scaleset_name()
if publisher_agent == 'geneva':
self.meter = get_geneva_exporter_meter(self.metrics_auth, self.metrics_namespace)
elif publisher_agent == 'azure_monitor':
self.meter = get_azure_monitor_exporter_meter(self.metrics_auth, self.metrics_namespace)
else:
print("##[ERROR]Invalid publisher agent")
self.metricNametoCounter = dict()
self.metricNametoHistogram = dict()
self.metricKeytoPreviousValue = dict()
def get_metrics(self):
"""
Get a list of metrics from the specified metrics ports
Returns:
metrics(list): A list of metrics:
[{'name': metric_name, 'value': metric_value, 'labels': metric_labels, 'type': metric_type}]
"""
metrics = []
metrics_port_list = self.metrics_ports.split(',')
for port in metrics_port_list:
metrics_url = f"http://localhost:{port}/metrics"
response = urllib.request.urlopen(metrics_url)
content = response.read().decode('utf-8')
metric_families = text_string_to_metric_families(content)
for metric_family in metric_families:
metric_type = metric_family.type
for sample in metric_family.samples:
metric_name = sample.name
metric_value = sample.value
metric_labels = sample.labels
metrics.append({'name': metric_name, 'value': metric_value,
'labels': metric_labels, 'type': metric_type})
return metrics
def publish_metrics(self, metrics):
"""
Publish the metrics to Geneva according to the metric type
Args:
metrics(list): A list of metrics:
[{'name': metric_name, 'value': metric_value, 'labels': metric_labels, 'type': metric_type}]
"""
if metrics == {}:
return
for metric in metrics:
if metric['type'] == 'counter':
self.publish_counter(metric)
elif metric['type'] == 'gauge':
self.publish_gauge(metric)
else:
print('This metrics type is not supported')
def publish_counter(self, metric):
"""
Publish counter type metrics to Geneva
Args:
metric(dict): A metric dictionary:
{'name': metric_name, 'value': metric_value, 'labels': metric_labels, 'type': metric_type}
"""
metric_value = metric['value'] if metric['value'] is not None else 0
key = self.get_key_from_metric(metric)
if key not in self.metricKeytoPreviousValue:
self.metricKeytoPreviousValue[key] = metric_value
return
previous_value = self.metricKeytoPreviousValue[key]
self.metricKeytoPreviousValue[key] = metric_value
delta = metric_value - previous_value
# If the delta is negative, it means the counter has been reset
if delta < 0:
return
if metric['name'] not in self.metricNametoCounter:
self.metricNametoCounter[metric['name']] = self.meter.create_counter(metric['name'])
counter = self.metricNametoCounter[metric['name']]
tags = self.get_tags_from_metric(metric['labels'])
counter.add(metric_value, tags)
def publish_gauge(self, metric):
"""
Publish gauge type metrics to Geneva
Args:
metric(dict): A metric dictionary:
{'name': metric_name, 'value': metric_value, 'labels': metric_labels, 'type': metric_type}
"""
if metric['name'] not in self.metricNametoHistogram:
self.metricNametoHistogram[metric['name']] = self.meter.create_histogram(metric['name'])
histogram = self.metricNametoHistogram[metric['name']]
metric_value = float(metric['value']) if metric['value'] is not None else 0
tags = self.get_tags_from_metric(metric['labels'])
histogram.record(metric_value, tags)
def get_key_from_metric(self, metric):
"""
Get a unique key for the metric
Args:
metric(dict): A metric dictionary:
{'name': metric_name, 'value': metric_value, 'labels': metric_labels, 'type': metric_type}
Returns:
key(str): A unique key for the metric ({metric_name}:{label_name}:{label_value})
"""
key = ''
key += metric['name']
for label in metric['labels']:
key += ':'
key += label + ':' + metric['labels'][label]
return key
def get_tags_from_metric(self, metric_labels):
"""
Get the tags from the metric labels
Args:
metric_labels(dict): A dictionary of metric labels: {label_name: label_value}
Returns:
tags(dict): A dictionary of tags: {tag_name: tag_value}
"""
tags = {}
tags['node_name'] = self.node_name
tags['vm_id'] = self.vm_id
tags['scaleset_name'] = self.scaleset_name
for label in metric_labels:
tags[label] = metric_labels[label]
return tags
if __name__ == '__main__':
# Get the publisher agent config
agent_config = get_publisher_metrics_config()
# Get common config
metrics_ports = agent_config['publisher_config']['common_config']['metrics_ports']
metrics_namespace = agent_config['publisher_config']['common_config']['metrics_namespace']
interval = int(agent_config['publisher_config']['common_config']['interval'])
# Get publisher agent config
if publisher_agent == 'geneva':
metrics_auth = agent_config['publisher_config']['geneva_agent_config']['metrics_account']
elif publisher_agent == 'azure_monitor':
metrics_auth = agent_config['publisher_config']['azure_monitor_agent_config']['connection_string']
else:
raise Exception('##[ERROR]The publisher agent is not supported')
# Start the publisher agent to publish metrics
metricsPublisher = MetricsPublisher(
metrics_ports=metrics_ports,
metrics_auth=metrics_auth,
metrics_namespace=metrics_namespace)
# Publish metrics every 20 seconds
while True:
try:
raw_metrics = metricsPublisher.get_metrics()
metricsPublisher.publish_metrics(raw_metrics)
except URLError:
logger.exception('Network connection issue.')
except:
logger.exception('Failed to retrieve and publish metrics to Geneva.')
time.sleep(interval)