elasticapm/metrics/base_metrics.py (297 lines of code) (raw):

# BSD 3-Clause License # # Copyright (c) 2019, Elasticsearch BV # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import threading import time from collections import defaultdict from typing import Union from elasticapm.conf import constants from elasticapm.utils.logging import get_logger from elasticapm.utils.module_import import import_string from elasticapm.utils.threading import IntervalTimer, ThreadManager logger = get_logger("elasticapm.metrics") DISTINCT_LABEL_LIMIT = 1000 class MetricsRegistry(ThreadManager): def __init__(self, client, tags=None) -> None: """ Creates a new metric registry :param client: client instance :param tags: """ self.client = client self._metricsets = {} self._tags = tags or {} self._collect_timer = None super(MetricsRegistry, self).__init__() def register(self, metricset: Union[str, type]) -> "MetricSet": """ Register a new metric set :param metricset: a string with the import path of the metricset class, or a class object that can be used to instantiate the metricset. If a class object is used, you can use the class object or `metricset.__name__` to retrieve the metricset using `get_metricset`. :return: the metricset instance """ class_id = metricset if isinstance(metricset, str) else f"{metricset.__module__}.{metricset.__name__}" if class_id in self._metricsets: return self._metricsets[class_id] else: if isinstance(metricset, str): try: class_obj = import_string(metricset) self._metricsets[metricset] = class_obj(self) except ImportError as e: logger.warning("Could not register %s metricset: %s", metricset, str(e)) else: self._metricsets[class_id] = metricset(self) return self._metricsets.get(class_id) def get_metricset(self, metricset: Union[str, type]) -> "MetricSet": metricset = metricset if isinstance(metricset, str) else f"{metricset.__module__}.{metricset.__name__}" try: return self._metricsets[metricset] except KeyError: raise MetricSetNotFound(metricset) def collect(self) -> None: """ Collect metrics from all registered metric sets and queues them for sending :return: """ if self.client.config.is_recording: logger.debug("Collecting metrics") for _, metricset in self._metricsets.items(): for data in metricset.collect(): self.client.queue(constants.METRICSET, data) def start_thread(self, pid=None) -> None: super(MetricsRegistry, self).start_thread(pid=pid) if self.client.config.metrics_interval: self._collect_timer = IntervalTimer( self.collect, self.collect_interval, name="eapm metrics collect timer", daemon=True ) logger.debug("Starting metrics collect timer") self._collect_timer.start() def stop_thread(self) -> None: if self._collect_timer and self._collect_timer.is_alive(): logger.debug("Cancelling collect timer") self._collect_timer.cancel() self._collect_timer = None # collect one last time self.collect() @property def collect_interval(self): return self.client.config.metrics_interval.total_seconds() @property def ignore_patterns(self): return self.client.config.disable_metrics or [] class MetricSet(object): def __init__(self, registry) -> None: self._lock = threading.Lock() self._counters = {} self._gauges = {} self._timers = {} self._histograms = {} self._registry = registry self._label_limit_logged = False def counter(self, name, reset_on_collect=False, **labels): """ Returns an existing or creates and returns a new counter :param name: name of the counter :param reset_on_collect: indicate if the counter should be reset to 0 when collecting :param labels: a flat key/value map of labels :return: the counter object """ return self._metric(self._counters, Counter, name, reset_on_collect, labels) def gauge(self, name, reset_on_collect=False, **labels): """ Returns an existing or creates and returns a new gauge :param name: name of the gauge :param reset_on_collect: indicate if the gouge should be reset to 0 when collecting :param labels: a flat key/value map of labels :return: the gauge object """ return self._metric(self._gauges, Gauge, name, reset_on_collect, labels) def timer(self, name, reset_on_collect=False, unit=None, **labels): """ Returns an existing or creates and returns a new timer :param name: name of the timer :param reset_on_collect: indicate if the timer should be reset to 0 when collecting :param unit: Unit of the observed metric :param labels: a flat key/value map of labels :return: the timer object """ return self._metric(self._timers, Timer, name, reset_on_collect, labels, unit) def histogram(self, name, reset_on_collect=False, unit=None, buckets=None, **labels): return self._metric(self._histograms, Histogram, name, reset_on_collect, labels, unit, buckets=buckets) def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=None, **kwargs): """ Returns an existing or creates and returns a metric :param container: the container for the metric :param metric_class: the class of the metric :param name: name of the metric :param reset_on_collect: indicate if the metric should be reset to 0 when collecting :param labels: a flat key/value map of labels :return: the metric object """ labels = self._labels_to_key(labels) key = (name, labels) with self._lock: if key not in container: if any(pattern.match(name) for pattern in self._registry.ignore_patterns): metric = noop_metric elif ( len(self._gauges) + len(self._counters) + len(self._timers) + len(self._histograms) >= DISTINCT_LABEL_LIMIT ): if not self._label_limit_logged: self._label_limit_logged = True logger.warning( "The limit of %d metricsets has been reached, no new metricsets will be created." % DISTINCT_LABEL_LIMIT ) metric = noop_metric else: metric = metric_class(name, reset_on_collect=reset_on_collect, unit=unit, **kwargs) container[key] = metric return container[key] def collect(self): """ Collects all metrics attached to this metricset, and returns it as a generator with one or more elements. More than one element is returned if labels are used. The format of the return value should be { "samples": {"metric.name": {"value": some_float}, ...}, "timestamp": unix epoch in microsecond precision } """ self.before_collect() timestamp = int(time.time() * 1000000) samples = defaultdict(dict) if self._counters: # iterate over a copy of the dict to avoid threading issues, see #717 for (name, labels), counter in self._counters.copy().items(): if counter is not noop_metric: val = counter.val if val or not counter.reset_on_collect: samples[labels].update({name: {"value": val}}) if counter.reset_on_collect: counter.reset() if self._gauges: for (name, labels), gauge in self._gauges.copy().items(): if gauge is not noop_metric: val = gauge.val if val or not gauge.reset_on_collect: samples[labels].update({name: {"value": val, "type": "gauge"}}) if gauge.reset_on_collect: gauge.reset() if self._timers: for (name, labels), timer in self._timers.copy().items(): if timer is not noop_metric: val, count = timer.val if val or not timer.reset_on_collect: sum_name = ".sum" if timer._unit: sum_name += "." + timer._unit samples[labels].update({name + sum_name: {"value": val}}) samples[labels].update({name + ".count": {"value": count}}) if timer.reset_on_collect: timer.reset() if self._histograms: for (name, labels), histo in self._histograms.copy().items(): if histo is not noop_metric: counts = histo.val if counts or not histo.reset_on_collect: # For the bucket values, we follow the approach described by Prometheus's # histogram_quantile function # (https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile) # to achieve consistent percentile aggregation results: # # "The histogram_quantile() function interpolates quantile values by assuming a linear # distribution within a bucket. (...) If a quantile is located in the highest bucket, # the upper bound of the second highest bucket is returned. A lower limit of the lowest # bucket is assumed to be 0 if the upper bound of that bucket is greater than 0. In that # case, the usual linear interpolation is applied within that bucket. Otherwise, the upper # bound of the lowest bucket is returned for quantiles located in the lowest bucket." bucket_midpoints = [] for i, bucket_le in enumerate(histo.buckets): if i == 0: if bucket_le > 0: bucket_le /= 2.0 elif i == len(histo.buckets) - 1: bucket_le = histo.buckets[i - 1] else: bucket_le = histo.buckets[i - 1] + (bucket_le - histo.buckets[i - 1]) / 2.0 bucket_midpoints.append(bucket_le) samples[labels].update( {name: {"counts": counts, "values": bucket_midpoints, "type": "histogram"}} ) if histo.reset_on_collect: histo.reset() if samples: for labels, sample in samples.items(): result = {"samples": sample, "timestamp": timestamp} if labels: result["tags"] = {k: v for k, v in labels} yield self.before_yield(result) def before_collect(self) -> None: """ A method that is called right before collection. Can be used to gather metrics. :return: """ pass def before_yield(self, data): """ A method that is called right before the data is yielded to be sent to Elasticsearch. Can be used to modify the data. """ return data def _labels_to_key(self, labels): return tuple((k, str(v)) for k, v in sorted(labels.items())) class SpanBoundMetricSet(MetricSet): def before_yield(self, data): tags = data.get("tags", None) if tags: span_type, span_subtype = tags.pop("span.type", None), tags.pop("span.subtype", "") if span_type or span_subtype: data["span"] = {"type": span_type, "subtype": span_subtype} transaction_name, transaction_type = tags.pop("transaction.name", None), tags.pop("transaction.type", None) if transaction_name or transaction_type: data["transaction"] = {"name": transaction_name, "type": transaction_type} return data class BaseMetric(object): __slots__ = ("name", "reset_on_collect") def __init__(self, name, reset_on_collect=False, **kwargs) -> None: self.name = name self.reset_on_collect = reset_on_collect class Counter(BaseMetric): __slots__ = BaseMetric.__slots__ + ("_lock", "_initial_value", "_val") def __init__(self, name, initial_value=0, reset_on_collect=False, unit=None) -> None: """ Creates a new counter :param name: name of the counter :param initial_value: initial value of the counter, defaults to 0 :param unit: unit of the observed counter. Unused for counters """ self._lock = threading.Lock() self._val = self._initial_value = initial_value super(Counter, self).__init__(name, reset_on_collect=reset_on_collect) def inc(self, delta=1): """ Increments the counter. If no delta is provided, it is incremented by one :param delta: the amount to increment the counter by :returns the counter itself """ with self._lock: self._val += delta return self def dec(self, delta=1): """ Decrements the counter. If no delta is provided, it is decremented by one :param delta: the amount to decrement the counter by :returns the counter itself """ with self._lock: self._val -= delta return self def reset(self): """ Reset the counter to the initial value :returns the counter itself """ with self._lock: self._val = self._initial_value return self @property def val(self): """Returns the current value of the counter""" return self._val @val.setter def val(self, value) -> None: with self._lock: self._val = value class Gauge(BaseMetric): __slots__ = BaseMetric.__slots__ + ("_val",) def __init__(self, name, reset_on_collect=False, unit=None) -> None: """ Creates a new gauge :param name: label of the gauge :param unit of the observed gauge. Unused for gauges """ self._val = None super(Gauge, self).__init__(name, reset_on_collect=reset_on_collect) @property def val(self): return self._val @val.setter def val(self, value) -> None: self._val = value def reset(self) -> None: self._val = 0 class Timer(BaseMetric): __slots__ = BaseMetric.__slots__ + ("_val", "_count", "_lock", "_unit") def __init__(self, name=None, reset_on_collect=False, unit=None) -> None: self._val: float = 0 self._count: int = 0 self._unit = unit self._lock = threading.Lock() super(Timer, self).__init__(name, reset_on_collect=reset_on_collect) def update(self, duration, count=1) -> None: with self._lock: self._val += duration self._count += count def reset(self) -> None: with self._lock: self._val = 0 self._count = 0 @property def val(self): with self._lock: return self._val, self._count @val.setter def val(self, value) -> None: with self._lock: self._val, self._count = value class Histogram(BaseMetric): DEFAULT_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, float("inf")) __slots__ = BaseMetric.__slots__ + ("_lock", "_buckets", "_counts", "_lock", "_unit") def __init__(self, name=None, reset_on_collect=False, unit=None, buckets=None) -> None: self._lock = threading.Lock() self._buckets = buckets or Histogram.DEFAULT_BUCKETS if self._buckets[-1] != float("inf"): self._buckets.append(float("inf")) self._counts = [0] * len(self._buckets) self._unit = unit super(Histogram, self).__init__(name, reset_on_collect=reset_on_collect) def update(self, value, count=1) -> None: pos = 0 while value > self._buckets[pos]: pos += 1 with self._lock: self._counts[pos] += count @property def val(self): with self._lock: return self._counts @val.setter def val(self, value) -> None: with self._lock: self._counts = value @property def buckets(self): return self._buckets def reset(self) -> None: with self._lock: self._counts = [0] * len(self._buckets) class NoopMetric(object): """ A no-op metric that implements the "interface" of both Counter and Gauge. Note that even when using a no-op metric, the value itself will still be calculated. """ def __init__(self, label, initial_value=0) -> None: return @property def val(self): return None @val.setter def val(self, value) -> None: return def inc(self, delta=1) -> None: return def dec(self, delta=-1) -> None: return def update(self, duration, count=1) -> None: return def reset(self) -> None: return noop_metric = NoopMetric("noop") class MetricSetNotFound(LookupError): def __init__(self, class_path) -> None: super(MetricSetNotFound, self).__init__("%s metric set not found" % class_path) # This is for backwards compatibility for the brave souls who were using # the undocumented system for custom metrics before we fixed it up and # documented it. MetricsSet = MetricSet