hasher-matcher-actioner/hmalib/lambdas/api/datasets.py (336 lines of code) (raw):

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved import bottle import typing as t from dataclasses import dataclass, asdict from mypy_boto3_dynamodb.service_resource import Table from hmalib import metrics from hmalib.aws_secrets import AWSSecrets from hmalib.common.config import HMAConfig from hmalib.common import config as hmaconfig from hmalib.common.s3_adapters import ThreatExchangeS3PDQAdapter, S3ThreatDataConfig from hmalib.common.configs.fetcher import ( ThreatExchangeConfig, AdditionalMatchSettingsConfig, ) from hmalib.common.threatexchange_config import ( create_privacy_group_if_not_exists, sync_privacy_groups, try_api_token, ) from hmalib.lambdas.api.middleware import ( jsoninator, JSONifiable, DictParseable, SubApp, ) @dataclass class Dataset(JSONifiable): privacy_group_id: t.Union[int, str] privacy_group_name: str description: str fetcher_active: bool matcher_active: bool write_back: bool in_use: bool def to_json(self) -> t.Dict: return asdict(self) @classmethod def from_dict(cls, d: dict) -> "Dataset": return cls( d["privacy_group_id"], d["privacy_group_name"], d["description"], d["fetcher_active"], d["matcher_active"], d["write_back"], d["in_use"], ) @classmethod def from_collab(cls, collab: ThreatExchangeConfig) -> "Dataset": return cls( collab.privacy_group_id, collab.privacy_group_name, collab.description, collab.fetcher_active, collab.matcher_active, collab.write_back, collab.in_use, ) @dataclass class SyncDatasetResponse(JSONifiable): response: str def to_json(self) -> t.Dict: return asdict(self) @dataclass class DeleteDatasetResponse(JSONifiable): response: str def to_json(self) -> t.Dict: return asdict(self) @dataclass class UpdateDatasetRequest(DictParseable): privacy_group_id: t.Union[int, str] fetcher_active: bool matcher_active: bool write_back: bool pdq_match_threshold: str @classmethod def from_dict(cls, d: dict) -> "UpdateDatasetRequest": return cls( d["privacy_group_id"], d["fetcher_active"], d["matcher_active"], d["write_back"], d.get("pdq_match_threshold", ""), ) @dataclass class CreateDatasetRequest(DictParseable): privacy_group_id: t.Union[int, str] privacy_group_name: str description: str fetcher_active: bool matcher_active: bool write_back: bool @classmethod def from_dict(cls, d: dict) -> "CreateDatasetRequest": return cls( d["privacy_group_id"], d["privacy_group_name"], d["description"], d["fetcher_active"], d["matcher_active"], d["write_back"], ) @dataclass class CreateDatasetResponse(JSONifiable): response: str def to_json(self) -> t.Dict: return asdict(self) @dataclass class ThreatExchangeDatasetSummary(Dataset): """ Factual information about a ThreatExchange dataset. This could be information like the name of the privacy group, the type of content it covers, the number of hashes it has etc. At the same time, it is not meant to replace the Dataset type. It will *not* contain configs that the user can edit. Eg. writeback_active, fetcher_active. Those continue to stay in the Dataset super class. """ hash_count: int match_count: int pdq_match_threshold: t.Optional[str] def to_json(self) -> t.Dict: dataset_json = super().to_json() dataset_json.update( hash_count=self.hash_count, match_count=self.match_count, pdq_match_threshold=self.pdq_match_threshold, ) return dataset_json @dataclass class DatasetSummariesResponse(JSONifiable): threat_exchange_datasets: t.List[ThreatExchangeDatasetSummary] def to_json(self) -> t.Dict: return { "threat_exchange_datasets": [ dataset.to_json() for dataset in self.threat_exchange_datasets ] } @dataclass class MatchSettingsResponseBody(JSONifiable): config: AdditionalMatchSettingsConfig def to_json(self) -> t.Dict: return { "privacy_group_id": self.config.name, "pdq_match_threshold": self.config.pdq_match_threshold, } @dataclass class MatchSettingsResponse(JSONifiable): match_settings: t.List[MatchSettingsResponseBody] def to_json(self) -> t.Dict: return { "match_settings": [settings.to_json() for settings in self.match_settings] } @dataclass class MatchSettingsUpdateRequest(DictParseable): privacy_group_id: str pdq_match_threshold: int @classmethod def from_dict(cls, d: dict) -> "MatchSettingsUpdateRequest": return cls( d["privacy_group_id"], d["pdq_match_threshold"], ) @dataclass class MatchSettingsUpdateResponse(JSONifiable): response: str def to_json(self) -> t.Dict: return asdict(self) def _get_signal_hash_count_and_last_modified( threat_exchange_data_bucket_name: str, threat_exchange_data_folder: str, ) -> t.Dict[str, t.Tuple[int, str]]: # TODO this method is expensive some cache or memoization method might be a good idea. s3_config = S3ThreatDataConfig( threat_exchange_data_bucket_name=threat_exchange_data_bucket_name, threat_exchange_data_folder=threat_exchange_data_folder, ) pdq_storage = ThreatExchangeS3PDQAdapter( config=s3_config, metrics_logger=metrics.names.api_hash_count() ) pdq_data_files = pdq_storage.load_data() return { file_name.split("/")[-1].split(".")[0]: ( len(rows), pdq_storage.last_modified[file_name], ) for file_name, rows in pdq_data_files.items() } def _get_threat_exchange_datasets( table: Table, threat_exchange_data_bucket_name: str, threat_exchange_data_folder: str, ) -> t.List[ThreatExchangeDatasetSummary]: collaborations = ThreatExchangeConfig.get_all() hash_counts: t.Dict[ str, t.Tuple[int, str] ] = _get_signal_hash_count_and_last_modified( threat_exchange_data_bucket_name, threat_exchange_data_folder, ) summaries = [] for collab in collaborations: if additional_config := AdditionalMatchSettingsConfig.get( str(collab.privacy_group_id) ): pdq_match_threshold = str(additional_config.pdq_match_threshold) else: pdq_match_threshold = "" summaries.append( ThreatExchangeDatasetSummary( collab.privacy_group_id, collab.privacy_group_name, collab.description, collab.fetcher_active, collab.matcher_active, collab.write_back, collab.in_use, hash_count=t.cast( int, hash_counts.get( collab.privacy_group_id, [-1, ""], )[0], ), match_count=-1, # fix will be based on new count system pdq_match_threshold=pdq_match_threshold, ) ) return summaries def get_datasets_api( hma_config_table: str, datastore_table: Table, threat_exchange_data_bucket_name: str, threat_exchange_data_folder: str, ) -> bottle.Bottle: """ ToDo / FixMe: this file is probably more about privacy groups than datasets... """ # The documentation below expects prefix to be '/datasets/' datasets_api = SubApp() HMAConfig.initialize(hma_config_table) @datasets_api.get("/", apply=[jsoninator]) def get_all_dataset_summaries() -> DatasetSummariesResponse: """ Returns summaries for all datasets. Summary includes all facts that are not configurable. Eg. its name, the number of hashes it has, the number of matches it has caused, etc. """ return DatasetSummariesResponse( threat_exchange_datasets=_get_threat_exchange_datasets( datastore_table, threat_exchange_data_bucket_name, threat_exchange_data_folder, ) ) @datasets_api.post("/update", apply=[jsoninator(UpdateDatasetRequest)]) def update_dataset(request: UpdateDatasetRequest) -> Dataset: """ Update dataset values: fetcher_active, write_back, and matcher_active. """ config = ThreatExchangeConfig.getx(str(request.privacy_group_id)) config.fetcher_active = request.fetcher_active config.write_back = request.write_back config.matcher_active = request.matcher_active updated_config = hmaconfig.update_config(config).__dict__ updated_config["privacy_group_id"] = updated_config["name"] additional_config = AdditionalMatchSettingsConfig.get( str(request.privacy_group_id) ) if request.pdq_match_threshold: if additional_config: additional_config.pdq_match_threshold = int(request.pdq_match_threshold) hmaconfig.update_config(additional_config) else: additional_config = AdditionalMatchSettingsConfig( str(request.privacy_group_id), int(request.pdq_match_threshold) ) hmaconfig.create_config(additional_config) elif additional_config: # pdq_match_threshold was set and now should be removed hmaconfig.delete_config(additional_config) return Dataset.from_dict(updated_config) @datasets_api.post("/create", apply=[jsoninator(CreateDatasetRequest)]) def create_dataset(request: CreateDatasetRequest) -> CreateDatasetResponse: """ Create a local dataset (defaults defined in CreateDatasetRequest) """ assert isinstance(request, CreateDatasetRequest) create_privacy_group_if_not_exists( privacy_group_id=str(request.privacy_group_id), privacy_group_name=request.privacy_group_name, description=request.description, in_use=True, fetcher_active=request.fetcher_active, matcher_active=request.matcher_active, write_back=request.write_back, ) return CreateDatasetResponse( response=f"Created dataset {request.privacy_group_id}" ) @datasets_api.post("/sync", apply=[jsoninator]) def sync_datasets() -> SyncDatasetResponse: """ Fetch new collaborations from ThreatExchange and sync with the configs stored in DynamoDB. """ sync_privacy_groups() return SyncDatasetResponse(response="Privacy groups are up to date") @datasets_api.post("/delete/<key>", apply=[jsoninator]) def delete_dataset(key=None) -> DeleteDatasetResponse: """ Delete the dataset with key=<key> """ config = ThreatExchangeConfig.getx(str(key)) hmaconfig.delete_config(config) return DeleteDatasetResponse(response="The privacy group is deleted") @datasets_api.get("/match-settings", apply=[jsoninator]) def get_all_match_settings() -> MatchSettingsResponse: """ Return all match settings configs """ return MatchSettingsResponse( match_settings=[ MatchSettingsResponseBody(c) for c in AdditionalMatchSettingsConfig.get_all() ] ) @datasets_api.get("/match-settings/<key>", apply=[jsoninator]) def get_match_settings( key=None, ) -> MatchSettingsResponseBody: """ Return a match settings config for a given privacy_group_id """ if config := AdditionalMatchSettingsConfig.get(str(key)): return MatchSettingsResponseBody(config) return bottle.abort(400, f"No match_settings for pg_id {key} found") @datasets_api.post( "/match-settings", apply=[jsoninator(MatchSettingsUpdateRequest)] ) def create_or_update_match_settings( request: MatchSettingsUpdateRequest, ) -> MatchSettingsUpdateResponse: """ Create or update a match settings config for a given privacy_group_id """ if config := AdditionalMatchSettingsConfig.get(request.privacy_group_id): config.pdq_match_threshold = request.pdq_match_threshold hmaconfig.update_config(config) return MatchSettingsUpdateResponse( f"match_settings updated for pg_id {request.privacy_group_id} with pdq_match_threshold={request.pdq_match_threshold}" ) config = AdditionalMatchSettingsConfig( request.privacy_group_id, request.pdq_match_threshold ) hmaconfig.create_config(config) return MatchSettingsUpdateResponse( f"match_settings created for pg_id {request.privacy_group_id} with pdq_match_threshold={request.pdq_match_threshold}" ) @datasets_api.delete("/match-settings/<key>", apply=[jsoninator]) def delete_match_settings( key=None, ) -> MatchSettingsUpdateResponse: """ Delete a match settings config for a given privacy_group_id """ if config := AdditionalMatchSettingsConfig.get(str(key)): hmaconfig.delete_config(config) return MatchSettingsUpdateResponse( f"match_settings deleted for pg_id {key}" ) return bottle.abort(400, f"No match_settings for pg_id {key} found") @datasets_api.post("/update-threatexchange-token") def update_threatexchange_token() -> t.Dict: """ Given a new threatexchange token as part of the request, makes dummy call to threatexchange to see if it is a valid token. If found valid, updates the secret. Returns 200 if successful, 400 if the token is invalid. Response body is always and empty JSON object. Expected JSON Keys: * `token`: the threatexchange token """ token = bottle.request.json["token"] is_valid_token = try_api_token(token) if is_valid_token: AWSSecrets().update_te_api_token(token) return {} bottle.response.status_code = 400 return {} return datasets_api