hasher-matcher-actioner/hmalib/lambdas/api/submit.py (305 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import functools
import bottle
import boto3
import base64
import json
import datetime
import dataclasses
from uuid import uuid4
from enum import Enum
from dataclasses import dataclass, asdict
from mypy_boto3_dynamodb.service_resource import Table
from mypy_boto3_sqs import SQSClient
from botocore.exceptions import ClientError
import typing as t
from threatexchange.content_type.photo import PhotoContent
from threatexchange.content_type.content_base import ContentType
from threatexchange.signal_type.signal_base import SignalType
from threatexchange.content_type.meta import (
get_content_type_for_name,
get_signal_types_by_name,
)
from hmalib.lambdas.api.middleware import (
jsoninator,
JSONifiable,
DictParseable,
SubApp,
)
from hmalib.common.content_sources import S3BucketContentSource
from hmalib.common.models.content import ContentObject, ContentRefType
from hmalib.common.logging import get_logger
from hmalib.common.messages.submit import URLSubmissionMessage
from hmalib.common.models.pipeline import PipelineHashRecord
logger = get_logger(__name__)
@functools.lru_cache(maxsize=None)
def _get_sqs_client() -> SQSClient:
return boto3.client("sqs")
def create_presigned_put_url(bucket_name, key, file_type, expiration=3600):
return create_presigned_url(bucket_name, key, file_type, expiration, "put_object")
def create_presigned_url(bucket_name, key, file_type, expiration, client_method):
"""
Generate a presigned URL to share an S3 object
"""
s3_client = boto3.client("s3")
params = {
"Bucket": bucket_name,
"Key": key,
}
if file_type:
params["ContentType"] = file_type
try:
response = s3_client.generate_presigned_url(
client_method,
Params=params,
ExpiresIn=expiration,
)
except ClientError as e:
logger.error(e)
return None
return response
# Request Objects
@dataclass
class SubmitRequestBodyBase(DictParseable):
content_id: str
content_type: t.Type[ContentType]
additional_fields: t.Optional[t.List]
force_resubmit: bool = False
def get_content_ref_details(self) -> t.Tuple[str, ContentRefType]:
raise NotImplementedError
@classmethod
def from_dict(cls, d):
base = cls(**{f.name: d.get(f.name, None) for f in dataclasses.fields(cls)})
base.content_type = get_content_type_for_name(base.content_type)
return base
@dataclass
class SubmitContents3ObjectRequestBody(SubmitRequestBodyBase):
bucket_name: str = ""
object_key: str = ""
@dataclass
class SubmitContentViaURLRequestBody(SubmitRequestBodyBase):
content_url: str = ""
def get_content_ref_details(self) -> t.Tuple[str, ContentRefType]:
return (self.content_url, ContentRefType.URL)
@dataclass
class SubmitContentBytesRequestBody(SubmitRequestBodyBase):
content_bytes: bytes = b""
def get_content_ref_details(self) -> t.Tuple[str, ContentRefType]:
return (self.content_id, ContentRefType.DEFAULT_S3_BUCKET)
@dataclass
class SubmitContentHashRequestBody(SubmitRequestBodyBase):
signal_value: str = ""
signal_type: t.Union[t.Type[SignalType], str] = "" # SignalType.getname() values
content_url: str = ""
@classmethod
def from_dict(cls, d):
base = super().from_dict(d)
base.signal_type = get_signal_types_by_name()[base.signal_type]
return base
def get_content_ref_details(self) -> t.Tuple[str, ContentRefType]:
if self.content_url:
return (self.content_url, ContentRefType.URL)
return ("", ContentRefType.NONE)
@dataclass
class SubmitContentViaPutURLUploadRequestBody(SubmitRequestBodyBase):
file_type: str = ""
def get_content_ref_details(self) -> t.Tuple[str, ContentRefType]:
# Treat this as an S3 submission because
# we expect the client to upload it there directly
return (self.content_id, ContentRefType.DEFAULT_S3_BUCKET)
# Response Objects
@dataclass
class SubmitResponse(JSONifiable):
content_id: str
submit_successful: bool
def to_json(self) -> t.Dict:
return asdict(self)
@dataclass
class SubmitViaUploadUrlResponse(JSONifiable):
content_id: str
file_type: str
presigned_url: str
def to_json(self) -> t.Dict:
return asdict(self)
@dataclass
class SubmitError(JSONifiable):
"""
Warning: by default this will still return 200
you need to update bottle.response.status
if you want a specific return code.
ToDo update middleware.py to handle this.
"""
content_id: str
message: str
def to_json(self) -> t.Dict:
return asdict(self)
def submit_content_request_from_s3_object(
dynamodb_table: Table,
submissions_queue_url: str,
bucket: str,
key: str,
content_id: str = "",
content_type: ContentType = PhotoContent,
additional_fields: t.Set = set(),
force_resubmit: bool = False,
):
"""
Converts s3 event into a ContentObject and url_submission_message using helpers
from submit.py
For partner bucket uploads, the content IDs are unique and (somewhat) readable but
not reversable
* uniqueness is provided by uuid4 which has a collision rate of 2^-36
* readability is provided by including part of the key in the content id
* modifications to the key mean that the original content bucket and key are
not derivable from the content ID alone
The original content (bucket and key) is stored in the reference url which is passed
to the webhook via additional_fields
Q: Why not include full key and bucket in content_id?
A: Bucket keys often have "/" which dont work well with ContentDetails UI page
"""
readable_key = key.split("/")[-1].replace("?", ".").replace("&", ".")
if not content_id:
content_id = f"{uuid4()}-{readable_key}"
presigned_url = create_presigned_url(bucket, key, None, 3600, "get_object")
reference_url = f"https://{bucket}.s3.amazonaws.com/{key}"
additional_fields.update(
{
f"s3_reference_url:{reference_url}",
f"bucket_name:{bucket}",
f"object_key:{key}",
}
)
record_content_submission(
dynamodb_table,
content_id,
content_type,
content_ref=presigned_url,
content_ref_type=ContentRefType.URL,
additional_fields=additional_fields,
)
send_submission_to_url_queue(
dynamodb_table, submissions_queue_url, content_id, content_type, presigned_url
)
def record_content_submission(
dynamodb_table: Table,
content_id: str,
content_type: ContentType,
content_ref: str,
content_ref_type: ContentRefType,
additional_fields: t.Set = set(),
force_resubmit: bool = False,
) -> bool:
"""
Write a content object that is submitted to the dynamodb_table.
Note: this method does not store the data of the content itself
If we want to store the media itself that is done either:
- by a client using a presign url we give them
- direct s3 put call in the case of raw bytes
- not at all in the case of CDN-URL submission
- (WIP: possibly done after a match is found)
This function is also called directly by api_root when handling s3 uploads to partner
banks. If editing, ensure the logic in api_root.process_s3_event is still correct
Return True with recording was successful.
"""
submit_time = datetime.datetime.now()
content_obj = ContentObject(
content_id=content_id,
content_type=content_type,
content_ref=content_ref,
content_ref_type=content_ref_type,
additional_fields=additional_fields,
submission_times=[submit_time], # Note: custom write_to_table impl appends.
created_at=submit_time,
updated_at=submit_time,
)
if force_resubmit:
# Allow an overwrite or resubmission of content objects
content_obj.write_to_table(dynamodb_table)
return True
return content_obj.write_to_table_if_not_found(dynamodb_table)
def send_submission_to_url_queue(
dynamodb_table: Table,
submissions_queue_url: str,
content_id: str,
content_type: ContentType,
url: str,
):
"""
Send a submitted url of content to the hasher. This does not store a copy of the content in s3
This function is also called directly by api_root when handling s3 uploads to partner
banks. If editing, ensure the logic in api_root.process_s3_event is still correct
"""
url_submission_message = URLSubmissionMessage(
content_type=content_type, content_id=content_id, url=t.cast(str, url)
)
_get_sqs_client().send_message(
QueueUrl=submissions_queue_url,
MessageBody=json.dumps(url_submission_message.to_sqs_message()),
)
def get_submit_api(
dynamodb_table: Table,
image_bucket: str,
image_prefix: str,
submissions_queue_url: str,
hash_queue_url: str,
) -> bottle.Bottle:
"""
A Closure that includes all dependencies that MUST be provided by the root
API that this API plugs into. Declare dependencies here, but initialize in
the root API alone.
"""
# A prefix to all routes must be provided by the api_root app
# The documentation below expects prefix to be '/submit/'
submit_api = SubApp()
s3_bucket_image_source = S3BucketContentSource(image_bucket, image_prefix)
def _content_exist_error(content_id: str):
return bottle.abort(
400,
f"Content with id '{content_id}' already exists if you want to resubmit `force_resubmit=True` must be included in payload.",
)
def _record_content_submission_from_request(
request: SubmitRequestBodyBase,
) -> bool:
"""
Given a request object submission record the content object to the table passed to
the API using 'record_content_submission'
Note: this method does not store the content media itself.
"""
content_ref, content_ref_type = request.get_content_ref_details()
return record_content_submission(
dynamodb_table,
content_id=request.content_id,
content_type=request.content_type,
content_ref=content_ref,
content_ref_type=content_ref_type,
additional_fields=set(request.additional_fields)
if request.additional_fields
else set(),
force_resubmit=request.force_resubmit,
)
@submit_api.post("/s3/", apply=[jsoninator(SubmitContents3ObjectRequestBody)])
def submit_s3(
request: SubmitContents3ObjectRequestBody,
) -> t.Union[SubmitResponse, SubmitError]:
"""
Submission of a s3 object of a piece of content.
"""
submit_content_request_from_s3_object(
dynamodb_table,
submissions_queue_url=submissions_queue_url,
bucket=request.bucket_name,
key=request.object_key,
content_id=request.content_id,
content_type=request.content_type,
additional_fields=set(request.additional_fields)
if request.additional_fields
else set(),
force_resubmit=request.force_resubmit,
)
return SubmitResponse(content_id=request.content_id, submit_successful=True)
@submit_api.post("/url/", apply=[jsoninator(SubmitContentViaURLRequestBody)])
def submit_url(
request: SubmitContentViaURLRequestBody,
) -> t.Union[SubmitResponse, SubmitError]:
"""
Submission via a url to content. This does not store a copy of the content in s3
"""
if not _record_content_submission_from_request(request):
return _content_exist_error(request.content_id)
send_submission_to_url_queue(
dynamodb_table,
submissions_queue_url,
request.content_id,
request.content_type,
request.content_url,
)
return SubmitResponse(content_id=request.content_id, submit_successful=True)
@submit_api.post("/bytes/", apply=[jsoninator(SubmitContentBytesRequestBody)])
def submit_bytes(
request: SubmitContentBytesRequestBody,
) -> t.Union[SubmitResponse, SubmitError]:
"""
Submit of media to HMA via a direct transfer of bytes to the system's s3 bucket.
"""
content_id = request.content_id
file_contents = base64.b64decode(request.content_bytes)
# We want to record the submission before triggering and processing on
# the content itself therefore we write to dynamodb before s3
if not _record_content_submission_from_request(request):
return _content_exist_error(request.content_id)
s3_bucket_image_source.put_image_bytes(content_id, file_contents)
return SubmitResponse(content_id=request.content_id, submit_successful=True)
@submit_api.post(
"/put-url/", apply=[jsoninator(SubmitContentViaPutURLUploadRequestBody)]
)
def submit_put_url(
request: SubmitContentViaPutURLUploadRequestBody,
) -> t.Union[SubmitViaUploadUrlResponse, SubmitError]:
"""
Submission of content to HMA in two steps
1st the creation to a content record and put url based on request body
2nd Upload to the system's s3 bucket by said put url returned by this method
"""
presigned_url = create_presigned_put_url(
bucket_name=image_bucket,
key=s3_bucket_image_source.get_s3_key(request.content_id),
file_type=request.file_type,
)
if presigned_url:
if not _record_content_submission_from_request(request):
return _content_exist_error(request.content_id)
return SubmitViaUploadUrlResponse(
content_id=request.content_id,
file_type=str(request.file_type),
presigned_url=presigned_url,
)
bottle.response.status = 400
return SubmitError(
content_id=request.content_id,
message="Failed to generate upload url",
)
@submit_api.post("/hash/", apply=[jsoninator(SubmitContentHashRequestBody)])
def submit_hash(
request: SubmitContentHashRequestBody,
) -> t.Union[SubmitResponse, SubmitError]:
"""
Submission of a hash from a piece of content.
Functions the same as other submission endpoint but skips
the hasher and media storage.
"""
# Record content object (even though we don't store anything just like with url)
if not _record_content_submission_from_request(request):
return _content_exist_error(request.content_id)
# Record hash
# ToDo expand submit hash API to include `signal_specific_attributes`
hash_record = PipelineHashRecord(
content_id=request.content_id,
signal_type=t.cast(t.Type[SignalType], request.signal_type),
content_hash=request.signal_value,
updated_at=datetime.datetime.now(),
)
hash_record.write_to_table(dynamodb_table)
# Send hash directly to matcher
# todo this could maybe try and reuse the methods in UnifiedHasher in #749
_get_sqs_client().send_message(
QueueUrl=hash_queue_url,
MessageBody=json.dumps(hash_record.to_sqs_message()),
)
return SubmitResponse(content_id=request.content_id, submit_successful=True)
return submit_api