hasher-matcher-actioner/hmalib/metrics/cloudwatch.py (108 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import boto3
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import typing as t
import logging
from hmalib import metrics
logger = logging.getLogger(__name__)
class AWSCloudWatchUnit(Enum):
Seconds = "Seconds"
Microseconds = "Microseconds"
Milliseconds = "Milliseconds"
Bytes = "Bytes"
Kilobytes = "Kilobytes"
Megabytes = "Megabytes"
Gigabytes = "Gigabytes"
Terabytes = "Terabytes"
Bits = "Bits"
Kilobits = "Kilobits"
Megabits = "Megabits"
Gigabits = "Gigabits"
Terabits = "Terabits"
Percent = "Percent"
Count = "Count"
Bytes_per_Second = "Bytes/Second"
Kilobytes_per_Second = "Kilobytes/Second"
Megabytes_per_Second = "Megabytes/Second"
Gigabytes_per_Second = "Gigabytes/Second"
Terabytes_per_Second = "Terabytes/Second"
Bits_per_Second = "Bits/Second"
Kilobits_per_Second = "Kilobits/Second"
Megabits_per_Second = "Megabits/Second"
Gigabits_per_Second = "Gigabits/Second"
Terabits_per_Second = "Terabits/Second"
Count_per_Second = "Count/Second"
@dataclass
class AWSCloudWatchMetricDatum:
"""
AWS Cloudwatch MetricData struct.
"""
metric_name: str
value: t.Optional[float] = None
dimensions: t.Optional[t.Dict[str, str]] = None
timestamp: datetime = field(default_factory=datetime.utcnow)
values: t.Optional[t.List[float]] = None
counts: t.Optional[t.List[int]] = None
unit: t.Optional[AWSCloudWatchUnit] = field(default=None)
def to_dict(self) -> t.Dict:
result: t.Dict[str, t.Any] = {
"MetricName": self.metric_name,
}
if self.timestamp:
result["Timestamp"] = self.timestamp
if self.value:
result["Value"] = self.value
if self.unit:
result["Unit"] = self.unit.value
if self.values:
result["Values"] = self.values
if self.counts:
result["Counts"] = self.counts
return result
class AWSCloudWatchReporter(object):
"""
An metrics reporter that publishes the metrics to cloudwatch when called.
For now, only timer and counters are supported because that's all you
need to measure performance of functions.
"""
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
# TODO Something better than 'hit limit=skip'
PUT_METRIC_DATA_VALUES_LIMIT = 150
PUT_METRIC_PER_PUT_LIMIT = 20
def __init__(self, namespace: str):
self.client = boto3.client("cloudwatch")
self.namespace = namespace
def get_multi_value_datums(
self,
name: str,
value_count_mapping: t.Mapping[t.Union[int, float], int],
unit: AWSCloudWatchUnit,
) -> t.Optional[AWSCloudWatchMetricDatum]:
"""
For reporting multiple values. Requires a dict from values ->
recurrence_count.
Returns a list of datums you can then report.
eg. If you saw the following latencies, [1, 2, 2, 3, 3, 3, 4, 5, 5, 1],
value_count_mapping would look like
{
1: 2,
2: 2,
3: 3,
4: 1,
5: 1
}
"""
if (
not value_count_mapping
or len(value_count_mapping) >= self.PUT_METRIC_DATA_VALUES_LIMIT
):
logger.warning(
"Skipping `AWSCloudWatchReporter.get_multi_value_datums`: number of metric subvalues would have errored on write."
)
return None
values = []
counts = []
for k, v in value_count_mapping.items():
values.append(k)
counts.append(v)
return AWSCloudWatchMetricDatum(
metric_name=name, values=values, counts=counts, unit=unit
)
def get_counter_datum(
self,
name: str,
value: int,
) -> AWSCloudWatchMetricDatum:
"""
For reporting counts. Returns a single datum.
"""
return AWSCloudWatchMetricDatum(
metric_name=name, value=value, unit=AWSCloudWatchUnit.Count
)
def report(self, metric_datums: t.List[AWSCloudWatchMetricDatum]):
# Publish metric datums to cloudwatch
if metric_datums and len(metric_datums) <= self.PUT_METRIC_PER_PUT_LIMIT:
self._put_metric_data(self.namespace, metric_datums)
else:
logger.warning(
"Skipping `AWSCloudWatchReporter.report`: number of metrics datums would have errored on write."
)
def _put_metric_data(
self, namespace: str, metric_datums: t.List[AWSCloudWatchMetricDatum]
):
self.client.put_metric_data(
Namespace=namespace, MetricData=[x.to_dict() for x in metric_datums]
)