hasher-matcher-actioner/hmalib/lambdas/api/stats.py (94 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import bottle
import datetime
from dataclasses import dataclass, asdict, field
import typing as t
from mypy_boto3_dynamodb.service_resource import Table
from hmalib.common.logging import get_logger
from hmalib import metrics
from hmalib.metrics import query as metrics_query
from hmalib.metrics.query import is_publishing_metrics
from hmalib.common.models.count import AggregateCount
from hmalib.lambdas.api.middleware import (
jsoninator,
JSONifiable,
SubApp,
)
logger = get_logger(__name__)
@dataclass
class StatsCard(JSONifiable):
time_span_count: int
time_span: metrics_query.MetricTimePeriod
graph_data: t.List[t.Tuple[datetime.datetime, t.Optional[int]]]
last_updated: datetime.datetime = field(default_factory=datetime.datetime.now)
def to_json(self) -> t.Dict:
result = asdict(self)
result.update(
last_updated=int(self.last_updated.timestamp()),
time_span=self.time_span.value,
graph_data=[
[int(datum[0].timestamp()), datum[1]] for datum in self.graph_data
],
)
return result
@dataclass
class StatResponse(JSONifiable):
"""
Represents a single stat.
"""
stat: StatsCard
def to_json(self) -> t.Dict:
return {"card": self.stat.to_json()}
@dataclass
class AggregateCountResponse(JSONifiable):
"""
Represents a simple set of Aggregate counts
"""
counts: t.Dict[str, int]
def to_json(self) -> t.Dict:
return asdict(self)
def get_stats_api(counts_table: Table) -> bottle.Bottle:
"""
Closure for all dependencies for the stats APIs.
"""
# A prefix to all routes must be provided by the api_root app
# The documentation below expects prefix to be '/stats/'
stats_api = SubApp()
stat_name_to_metric = {
"hashes": metrics.names.pdq_hasher_lambda.hash,
"matches": metrics.names.pdq_matcher_lambda.write_match_record,
}
@stats_api.get("/", apply=[jsoninator])
def default_stats() -> StatResponse:
"""
If measure performance tfvar/os.env is true, it returns stats, else,
returns 404. A 404 should be surfaced by clients with instructions on
how to enable metrics tracking.
The graph_data always contains the start_time and end_time timestamps
with 0 values to make graphing easier.
"""
if not is_publishing_metrics():
return bottle.abort(404, "This HMA instance is not publishing metrics.")
if (
not bottle.request.query.stat_name
or bottle.request.query.stat_name not in stat_name_to_metric
):
return bottle.abort(
400,
f"Must specifiy stat_name in query parameters. Must be one of {stat_name_to_metric.keys()}",
)
metric = stat_name_to_metric[bottle.request.query.stat_name]
time_span_arg = bottle.request.query.time_span
metric_time_period = {
"24h": metrics_query.MetricTimePeriod.HOURS_24,
"1h": metrics_query.MetricTimePeriod.HOURS_1,
"7d": metrics_query.MetricTimePeriod.DAYS_7,
}.get(time_span_arg, metrics_query.MetricTimePeriod.HOURS_24)
count_with_graphs = metrics_query.get_count_with_graph(
[metric],
metric_time_period,
)
return StatResponse(
StatsCard(
count_with_graphs[metric].count,
metric_time_period,
count_with_graphs[metric].graph_data,
)
)
@stats_api.get("/counts/", apply=[jsoninator])
def aggregate_counts() -> AggregateCountResponse:
"""
return the set of aggregate_counts
"""
if not is_publishing_metrics():
return bottle.abort(404, "This HMA instance is not publishing metrics.")
PIPELINE_COUNTS_TO_SURFACE = [
AggregateCount.PipelineNames.submits,
AggregateCount.PipelineNames.hashes,
AggregateCount.PipelineNames.matches,
]
return AggregateCountResponse(
{
count_name: int(AggregateCount(count_name).get_value(counts_table))
for count_name in PIPELINE_COUNTS_TO_SURFACE
}
)
return stats_api