hasher-matcher-actioner/hmalib/lambdas/fetcher.py (128 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
Implementation of the "fetcher" module of HMA.
Fetching involves connecting to the ThreatExchange API and downloading
signals to synchronize a local copy of the database, which will then
be fed into various indices.
"""
import logging
import time
import os
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from functools import lru_cache
from typing import DefaultDict
import boto3
from threatexchange.api import ThreatExchangeAPI
from threatexchange.signal_type.pdq import PdqSignal
from threatexchange.signal_type.md5 import VideoMD5Signal
from threatexchange.threat_updates import ThreatUpdateJSON
from hmalib.aws_secrets import AWSSecrets
from hmalib.common.config import HMAConfig
from hmalib.common.logging import get_logger
from hmalib.common.configs.fetcher import ThreatExchangeConfig
from hmalib.common.s3_adapters import ThreatUpdateS3Store
logger = get_logger(__name__)
dynamodb = boto3.resource("dynamodb")
# In one fetcher run, how many descriptor updates to fetch per privacy group
# using /threat_updates
MAX_DESCRIPTORS_UPDATED = 20000
# Print progress when polling threat_updates once every...<> seconds
PROGRESS_PRINT_INTERVAL_SEC = 20
@lru_cache(maxsize=None)
def get_s3_client():
return boto3.client("s3")
# Lambda init tricks
@lru_cache(maxsize=1)
def lambda_init_once():
"""
Do some late initialization for required lambda components.
Lambda initialization is weird - despite the existence of perfectly
good constructions like __name__ == __main__, there don't appear
to be easy ways to split your lambda-specific logic from your
module logic except by splitting up the files and making your
lambda entry as small as possible.
TODO: Just refactor this file to separate the lambda and functional
components
"""
cfg = FetcherConfig.get()
HMAConfig.initialize(cfg.config_table_name)
@dataclass
class FetcherConfig:
"""
Simple holder for getting typed environment variables
"""
s3_bucket: str
s3_te_data_folder: str
config_table_name: str
data_store_table: str
@classmethod
@lru_cache(maxsize=None) # probably overkill, but at least it's consistent
def get(cls):
# These defaults are naive but can be updated for testing purposes.
return cls(
s3_bucket=os.environ["THREAT_EXCHANGE_DATA_BUCKET_NAME"],
s3_te_data_folder=os.environ["THREAT_EXCHANGE_DATA_FOLDER"],
config_table_name=os.environ["CONFIG_TABLE_NAME"],
data_store_table=os.environ["DYNAMODB_DATASTORE_TABLE"],
)
def is_int(int_string: str):
"""
Checks if string is convertible to int.
"""
try:
int(int_string)
return True
except ValueError:
return False
class ProgressLogger:
"""
Use this to get a progress logger which counts up the number of items
processed via /threat_updates.
Returns a callable class.
"""
def __init__(self):
self.processed = 0
self.last_update_time = None
self.counts = defaultdict(lambda: 0)
self.last_update_printed = 0
def __call__(self, update: ThreatUpdateJSON):
self.processed += 1
self.counts[update.threat_type] += -1 if update.should_delete else 1
self.last_update_time = update.time
now = time.time()
if now - self.last_update_printed >= PROGRESS_PRINT_INTERVAL_SEC:
self.last_update_printed = now
logger.info("threat_updates/: processed %d descriptors.", self.processed)
def lambda_handler(_event, _context):
"""
Run through threatexchange privacy groups and fetch updates to them. If this
is the first time for a privacy group, will fetch from the start, else only
updates since the last time.
Note: since this is a scheduled job, we swallow all exceptions. We only log
exceptions and move on.
"""
lambda_init_once()
config = FetcherConfig.get()
collabs = ThreatExchangeConfig.get_all()
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
names = [collab.privacy_group_name for collab in collabs[:5]]
if len(names) < len(collabs):
names[-1] = "..."
data = f"Triggered at time {current_time}, found {len(collabs)} collabs: {', '.join(names)}"
logger.info(data)
api_token = AWSSecrets().te_api_token()
api = ThreatExchangeAPI(api_token)
for collab in collabs:
logger.info(
"Processing updates for collaboration %s", collab.privacy_group_name
)
if not is_int(collab.privacy_group_id):
logger.info(
f"Fetch skipped because privacy_group_id({collab.privacy_group_id}) is not an int"
)
continue
if not collab.fetcher_active:
logger.info(
f"Fetch skipped because configs has `fetcher_active` set to false for privacy_group_id({collab.privacy_group_id})"
)
continue
indicator_store = ThreatUpdateS3Store(
int(collab.privacy_group_id),
api.app_id,
s3_client=get_s3_client(),
s3_bucket_name=config.s3_bucket,
s3_te_data_folder=config.s3_te_data_folder,
data_store_table=config.data_store_table,
supported_signal_types=[VideoMD5Signal, PdqSignal],
)
try:
indicator_store.load_checkpoint()
if indicator_store.stale:
logger.warning(
"Store for %s - %d stale! Resetting.",
collab.privacy_group_name,
int(collab.privacy_group_id),
)
indicator_store.reset()
if indicator_store.fetch_checkpoint >= now.timestamp():
continue
delta = indicator_store.next_delta
delta.incremental_sync_from_threatexchange(
api, limit=MAX_DESCRIPTORS_UPDATED, progress_fn=ProgressLogger()
)
except Exception: # pylint: disable=broad-except
logger.exception(
"Encountered exception while getting updates. Will attempt saving.."
)
# Force delta to show finished
delta.end = delta.current
finally:
if delta:
logging.info("Fetch complete, applying %d updates", len(delta.updates))
indicator_store.apply_updates(
delta, post_apply_fn=indicator_store.post_apply
)
else:
logging.error("Failed before fetching any records")