hasher-matcher-actioner/hmalib/metrics/__init__.py (111 lines of code) (raw):

import collections from contextlib import contextmanager from functools import wraps import logging import time import typing as t import os """ Defines some wrappers which when invoked with the "correct" environment variable will instrument functions and drop their metrics to cloudwatch. Without the environment variables, will be a no-op. """ _ENABLE_PERF_MEASUREMENTS_ENVVAR = "MEASURE_PERFORMANCE" measure_performance: bool = os.getenv(_ENABLE_PERF_MEASUREMENTS_ENVVAR, "False") in [ "True", "1", ] logger = logging.getLogger(__name__) class lambda_with_datafiles: def prefix_impl(self): raise NotImplementedError() @property def download_datafiles(self): return f"{self.prefix_impl()}.download_datafiles" @property def parse_datafiles(self): return f"{self.prefix_impl()}.parse_datafiles" class names: """ Not a real class, just a bag of metric names. Ignore the lowercase name if you can. :) Because metric naming can have a very real impact on how dashboards are built, use this central location to allow easier following of conventions. Conventions: - The metric name should be of the format "{prefix}.{action}_{noun}" - prefixes can be dotted. - action and noun both should be free of special characters - if used as a timer, will be suffixed by "-count" and "-duration" """ hma_namespace = "ThreatExchange/HMA" class pdq_hasher_lambda: _prefix = "lambdas.pdqhasher" download_file = f"{_prefix}.download_file" hash = f"{_prefix}.hash" class pdq_indexer_lambda(lambda_with_datafiles): _prefix = "lambdas.pdqindexer" def prefix_impl(self): return _prefix merge_datafiles = f"{_prefix}.merge_datafiles" build_index = f"{_prefix}.build_index" upload_index = f"{_prefix}.upload_index" class pdq_matcher_lambda: _prefix = "lambdas.pdqmatcher" download_index = f"{_prefix}.download_index" parse_index = f"{_prefix}.parse_index" search_index = f"{_prefix}.search_index" write_match_record = f"{_prefix}.write_match_record" class api_hash_count(lambda_with_datafiles): _prefix = "api.hashcount" def prefix_impl(self): return self._prefix class hasher: _prefix = "hasher" download_file = f"{_prefix}.download_file" write_record = f"{_prefix}.write_record" publish_message = f"{_prefix}.publish_message" @classmethod def hash(cls, hash_type: str): return f"{cls._prefix}.hash.{hash_type}" class indexer: _prefix = "indexer" download_datafiles = f"{_prefix}.download_datafiles" parse_datafiles = f"{_prefix}.parse_datafiles" build_index = f"{_prefix}.build_index" upload_index = f"{_prefix}.upload_index" merge_datafiles = f"{_prefix}.merge_datafiles" search_index = f"{_prefix}.search_index" download_index = f"{_prefix}.download_index" get_bank_data = f"{_prefix}.get_bank_data" _METRICS_NAMESPACE_ENVVAR = "METRICS_NAMESPACE" METRICS_NAMESPACE = os.getenv(_METRICS_NAMESPACE_ENVVAR, names.hma_namespace) counts: collections.Counter = collections.Counter() timers: t.Mapping[str, collections.Counter] = collections.defaultdict( collections.Counter ) @contextmanager def _no_op_timer(name): yield def _no_op_flush(namespace: str = "does not matter"): pass if measure_performance: logger.info( "Performance measurement requested. Supplying appmetrics instrumentation." ) from hmalib.metrics.cloudwatch import AWSCloudWatchReporter, AWSCloudWatchUnit @contextmanager def _timer_wrapper(name): """ While in most other setups you'd see some amount of sampling, lambdas are different. A single container will be used for a batches sized at 10/100. At that, sampling will lose information. So, while we stream all data to cloudwatch, cloudwatch on its end will do the necessary statistical math to give us quantiles. We don't expect the lambdas to be used in multi-threaded environments. This impl is not guaranteed to be thread-safe. A timer does two things. It counts and it times. It will result in two metrics "{name}-duration" and "{name}-count" """ count_name = f"{name}-count" duration_name = f"{name}-duration" start_ms: int = int(time.perf_counter() * 1000) yield duration_ms: int = int(time.perf_counter() * 1000) - start_ms timers[duration_name].update({duration_ms: 1}) counts.update({count_name: 1}) def _metrics_flush(namespace: str = METRICS_NAMESPACE): """ Flushes metrics to an AWS Reporter. Warning not flush will not go through if it would hit PutMetricData's Limit See AWSCloudWatchReporter.PUT_METRIC_DATA_VALUES_LIMIT """ try: reporter = AWSCloudWatchReporter(namespace) datums = [] datums.extend([reporter.get_counter_datum(k, v) for k, v in counts.items()]) for duration_name, value_count_mapping in timers.items(): # if value_count_mapping is empty or > PUT_METRIC_DATA_VALUES_LIMIT, # method returns None. filter those out otherwise report will throw an error. if datum := reporter.get_multi_value_datums( name=duration_name, value_count_mapping=value_count_mapping, unit=AWSCloudWatchUnit.Milliseconds, ): datums.append(datum) reporter.report(datums) except Exception as e: logger.exception("Couldn't report metrics to cloudwatch") flush = _metrics_flush timer = _timer_wrapper else: logger.info( "Performance measurement not requested. Supplying no-op instrumentation." ) flush = _no_op_flush timer = _no_op_timer