hasher-matcher-actioner/hmalib/lambdas/api/content.py (176 lines of code) (raw):

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved from threatexchange import signal_type from hmalib.lambdas.api.submit import create_presigned_url import bottle import boto3 from datetime import datetime from dataclasses import dataclass, asdict, field from mypy_boto3_dynamodb.service_resource import Table from boto3.dynamodb.conditions import Attr, Key, Or from botocore.exceptions import ClientError import typing as t from threatexchange.signal_type.signal_base import SignalType from threatexchange.content_type.content_base import ContentType from hmalib.lambdas.api.middleware import ( jsoninator, JSONifiable, DictParseable, SubApp, ) from hmalib.common.models.pipeline import MatchRecord, PipelineHashRecord from hmalib.common.models.content import ( ContentObject, ActionEvent, ContentRefType, ) from hmalib.common.content_sources import S3BucketContentSource from hmalib.common.logging import get_logger logger = get_logger(__name__) s3_client = boto3.client("s3") dynamodb = boto3.resource("dynamodb") @dataclass class HashResultResponse(JSONifiable): content_id: str content_hash: str updated_at: str def to_json(self) -> t.Dict: return asdict(self) @dataclass class ContentPreviewResponse(JSONifiable): preview_url: str def to_json(self) -> t.Dict: return asdict(self) @dataclass class ContentPipelineProgress(JSONifiable): """ Encompasses optional results for all stages of the pipeline. Includes results from any of those stages and the time those stages were updated. """ content_id: str content_type: t.Type[ContentType] content_preview_url: str # Stage update times submitted_at: t.Optional[datetime] hashed_at: t.Optional[datetime] = None matched_at: t.Optional[datetime] = None action_evaluated_at: t.Optional[datetime] = None action_performed_at: t.Optional[datetime] = None # Stage update results submission_additional_fields: t.List[str] = field(default_factory=list) # map from signal_type string => hash value hash_results: t.Dict[str, str] = field(default_factory=dict) # map from signal_id => set[classification_strings] match_results: t.Dict[str, t.List[str]] = field(default_factory=dict) # list of action names that must be performed action_evaluation_results: t.List[str] = field(default_factory=list) # list of action names that have been performed. action_perform_results: t.List[str] = field(default_factory=list) def to_json(self) -> t.Dict: result = asdict(self) result.update( content_type=self.content_type.get_name(), submitted_at=self.submitted_at and self.submitted_at.isoformat(), hashed_at=self.hashed_at and self.hashed_at.isoformat(), matched_at=self.matched_at and self.matched_at.isoformat(), action_evaluated_at=self.action_evaluated_at and self.action_evaluated_at.isoformat(), action_performed_at=self.action_performed_at and self.action_performed_at.isoformat(), ) return result @dataclass class ActionHistoryResponse(JSONifiable): action_events: t.List[ActionEvent] = field(default_factory=list) def to_json(self) -> t.Dict: return {"action_history": [record.to_json() for record in self.action_events]} def get_content_api( dynamodb_table: Table, image_bucket: str, image_prefix: str ) -> bottle.Bottle: """ A Closure that includes all dependencies that MUST be provided by the root API that this API plugs into. Declare dependencies here, but initialize in the root API alone. """ def get_preview_url(content_id, content_object) -> str: """ Given a content_id and a content_object, returns a URL you can use to preview it. """ content_object = t.cast(ContentObject, content_object) preview_url = "" if content_object.content_ref_type == ContentRefType.DEFAULT_S3_BUCKET: source = S3BucketContentSource(image_bucket, image_prefix) preview_url = create_presigned_url( image_bucket, source.get_s3_key(content_id), None, 3600, "get_object" ) elif content_object.content_ref_type == ContentRefType.URL: preview_url = content_object.content_ref if not preview_url: return bottle.abort(400, "preview_url not found.") return preview_url # A prefix to all routes must be provided by the api_root app # The documentation below expects prefix to be '/content/' content_api = SubApp() @content_api.get("/", apply=[jsoninator]) def content() -> t.Optional[ContentObject]: """ Return content object for given ID. """ content_id = bottle.request.query.content_id or None if content_id: return ContentObject.get_from_content_id(dynamodb_table, content_id) return None @content_api.get("/pipeline-progress/", apply=[jsoninator]) def pipeline_progress() -> ContentPipelineProgress: """ WARNING: UNOPTIMIZED. DO NOT CALL FROM AUTOMATED SYSTEMS. Build a history of the stages that this piece of content has gone through and what their results were. Do not call this from anything but a UI. This is not optimized for performance. """ content_id = bottle.request.query.content_id or None if not content_id: return bottle.abort(400, "content_id must be provided.") content_id = t.cast(str, content_id) content_object = ContentObject.get_from_content_id(dynamodb_table, content_id) if not content_object: return bottle.abort(400, f"Content with id '{content_id}' not found.") content_object = t.cast(ContentObject, content_object) preview_url = get_preview_url(content_id, content_object) # The result object will be gradually built up as records are retrieved. result = ContentPipelineProgress( content_id=content_id, content_type=content_object.content_type, content_preview_url=preview_url, submitted_at=content_object.updated_at, submission_additional_fields=list(content_object.additional_fields), ) hash_records = PipelineHashRecord.get_from_content_id( dynamodb_table, content_id ) if len(hash_records) != 0: result.hashed_at = max(hash_records, key=lambda r: r.updated_at).updated_at for hash_record in hash_records: # Assume that each signal type has a single hash if hash_record.signal_type.get_name() in result.hash_results: return bottle.abort( 500, f"Content with id '{content_id}' has multiple hash records for signal-type: '{hash_record.signal_type.get_name()}'.", ) result.hash_results[ hash_record.signal_type.get_name() ] = hash_record.content_hash match_records = MatchRecord.get_from_content_id(dynamodb_table, content_id) if len(match_records) != 0: result.matched_at = max( match_records, key=lambda r: r.updated_at ).updated_at # TODO #751 Until we resolve type agnostic storage of signal data, # we can't populate match details. # actually populate result.match_results. # TODO: ActionEvaluation does not yet leave a trail. Either record # action evaluation or remove the evaluation stage from the # pipeline-progress indicator. action_records = ActionEvent.get_from_content_id(dynamodb_table, content_id) if len(action_records) != 0: result.action_performed_at = max( action_records, key=lambda r: r.performed_at ).performed_at result.action_perform_results = [r.action_label for r in action_records] return result @content_api.get("/action-history/", apply=[jsoninator]) def action_history() -> ActionHistoryResponse: """ Return list of action event records for a given ID. """ if content_id := bottle.request.query.content_id or None: return ActionHistoryResponse( ActionEvent.get_from_content_id(dynamodb_table, f"{content_id}") ) return ActionHistoryResponse() @content_api.get("/hash/", apply=[jsoninator]) def hashes() -> t.Optional[HashResultResponse]: """ Return the hash details for a given ID. """ content_id = bottle.request.query.content_id or None if not content_id: return None # FIXME: Presently, hash API can only support one hash per content_id record = PipelineHashRecord.get_from_content_id( dynamodb_table, f"{content_id}" )[0] if not record: return None return HashResultResponse( content_id=record.content_id, content_hash=record.content_hash, updated_at=record.updated_at.isoformat(), ) @content_api.get("/preview-url/", apply=[jsoninator]) def image(): """ Return the a URL to submitted media for a given ID. If URL was submitted is it returned else creates a signed URL for s3 uploads. Also works for videos. """ content_id = bottle.request.query.content_id or None if not content_id: return bottle.abort(400, "content_id must be provided.") content_object: ContentObject = ContentObject.get_from_content_id( table=dynamodb_table, content_id=content_id ) if not content_object: return bottle.abort(404, "content_id does not exist.") preview_url = get_preview_url(content_id, content_object) return ContentPreviewResponse(preview_url) return content_api