hasher-matcher-actioner/hmalib/lambdas/api/bank.py (194 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
from functools import lru_cache
from datetime import datetime
import json
import uuid
import bottle
import typing as t
from dataclasses import asdict, dataclass, field
from urllib.parse import quote as uriencode
import boto3
from mypy_boto3_dynamodb.service_resource import Table
from threatexchange.content_type.meta import (
get_content_type_for_name,
get_signal_types_by_name,
)
from threatexchange.content_type.photo import PhotoContent
from threatexchange.content_type.video import VideoContent
from hmalib.common.models.bank import Bank, BankMember, BanksTable, BankMemberSignal
from hmalib.banks import bank_operations as bank_ops
from hmalib.lambdas.api.middleware import (
jsoninator,
JSONifiable,
SubApp,
)
from hmalib.lambdas.api.submit import create_presigned_put_url, create_presigned_url
@dataclass
class AllBanksEnvelope(JSONifiable):
banks: t.List[Bank]
def to_json(self) -> t.Dict:
return {"banks": [bank.to_json() for bank in self.banks]}
@dataclass
class PreviewableBankMember(BankMember):
"""
A bank-member, but has a preview_url. preview_url should be pre-authorized,
and of the same content_type as the original media.
"""
preview_url: str = field(default_factory=lambda: "")
@dataclass
class BankMembersPage(JSONifiable):
bank_members: t.List[PreviewableBankMember]
# deserializes to dynamo's exclusive_start_key. Is a dict
continuation_token: t.Optional[str]
def to_json(self) -> t.Dict:
result = asdict(self)
result.update(bank_members=[member.to_json() for member in self.bank_members])
return result
@dataclass
class PreviewableBankMemberWithSignals(PreviewableBankMember):
signals: t.List[BankMemberSignal] = field(default_factory=list)
def to_json(self) -> t.Dict:
result = super().to_json()
result.update(signals=[signal.to_json() for signal in self.signals])
return result
def with_preview_url(bank_member: BankMember) -> PreviewableBankMember:
previewable = PreviewableBankMember(**asdict(bank_member))
if bank_member.storage_bucket is None:
return previewable
previewable.preview_url = create_presigned_url(
bucket_name=bank_member.storage_bucket,
key=bank_member.storage_key,
file_type=None,
expiration=300,
client_method="get_object",
)
return previewable
def with_preview_urls(
bank_members: t.List[BankMember],
) -> t.List[PreviewableBankMember]:
"""
For a list of bank_members, converts the storage details into a publicly
visible image for UI to work with.
"""
return list(map(with_preview_url, bank_members))
@lru_cache(maxsize=None)
def _get_sqs_client():
return boto3.client("sqs")
def get_bank_api(
bank_table: Table, bank_user_media_bucket: str, submissions_queue_url: str
) -> bottle.Bottle:
"""
Closure for dependencies of the bank API
"""
bank_api = SubApp()
table_manager = BanksTable(table=bank_table)
# Bank Management
@bank_api.get("/get-all-banks", apply=[jsoninator])
def get_all_banks() -> AllBanksEnvelope:
"""
Get all banks.
"""
return AllBanksEnvelope(banks=table_manager.get_all_banks())
@bank_api.get("/get-bank/<bank_id>", apply=[jsoninator])
def get_bank(bank_id=None) -> Bank:
"""
Get a specific bank from a bank_id.
"""
bank = table_manager.get_bank(bank_id=bank_id)
return bank
@bank_api.post("/create-bank", apply=[jsoninator])
def create_bank() -> Bank:
"""
Create a bank using only the name, description and an is_active flag,
and optionally tags.
"""
return table_manager.create_bank(
bank_name=bottle.request.json["bank_name"],
bank_description=bottle.request.json["bank_description"],
is_active=bottle.request.json["is_active"],
bank_tags=set(bottle.request.json["bank_tags"]),
)
@bank_api.post("/update-bank/<bank_id>", apply=[jsoninator])
def update_bank(bank_id=None) -> Bank:
"""
Update name and description for a bank_id.
"""
return table_manager.update_bank(
bank_id=bank_id,
bank_name=bottle.request.json["bank_name"],
bank_description=bottle.request.json["bank_description"],
is_active=bottle.request.json["is_active"],
bank_tags=set(bottle.request.json["bank_tags"]),
)
# Member Management
@bank_api.get("/get-members/<bank_id>", apply=[jsoninator])
def get_members(bank_id=None) -> BankMembersPage:
"""
Get a page of bank members. Use the "continuation_token" from this
response to get subsequent pages.
"""
continuation_token = (
bottle.request.query.continuation_token
and json.loads(bottle.request.query.continuation_token)
or None
)
try:
content_type = get_content_type_for_name(bottle.request.query.content_type)
except:
bottle.abort(400, "content_type must be provided as a query parameter.")
db_response = table_manager.get_all_bank_members_page(
bank_id=bank_id,
content_type=content_type,
exclusive_start_key=continuation_token,
)
continuation_token = None
if db_response.last_evaluated_key:
continuation_token = uriencode(json.dumps(db_response.last_evaluated_key))
return BankMembersPage(
bank_members=with_preview_urls(db_response.items),
continuation_token=continuation_token,
)
@bank_api.post("/add-member/<bank_id>", apply=[jsoninator])
def add_member(bank_id=None) -> PreviewableBankMember:
"""
Add a bank member. Expects a JSON object with following fields:
- content_type: ["photo"|"video"]
- storage_bucket: s3bucket for the media
- storage_key: key for the media on s3
- notes: String, any additional notes you want to associate with this
member.
Clients would want to use get_media_upload_url() to get a
storage_bucket, storage_key and a upload_url before using add_member()
Returns 200 OK with the resulting bank_member. 500 on failure.
"""
content_type = get_content_type_for_name(bottle.request.json["content_type"])
storage_bucket = bottle.request.json["storage_bucket"]
storage_key = bottle.request.json["storage_key"]
notes = bottle.request.json["notes"]
bank_member_tags = set(bottle.request.json["bank_member_tags"])
return with_preview_url(
bank_ops.add_bank_member(
banks_table=table_manager,
sqs_client=_get_sqs_client(),
submissions_queue_url=submissions_queue_url,
bank_id=bank_id,
content_type=content_type,
storage_bucket=storage_bucket,
storage_key=storage_key,
raw_content=None,
notes=notes,
bank_member_tags=bank_member_tags,
)
)
@bank_api.post("/add-detached-member-signal/<bank_id>", apply=[jsoninator])
def add_detached_bank_member_signal(bank_id=None) -> BankMemberSignal:
"""
Add a virtual bank_member (without any associated media) and a
corresponding signal.
Requires JSON object with following fields:
- signal_type: ["pdq"|"pdq_ocr","photo_md5"] -> anything from
threatexchange.content_type.meta.get_signal_types_by_name()'s keys
- content_type: ["photo"|"video"] to get the content_type for the
virtual member.
- signal_value: the hash to store against this signal. Will
automatically de-dupe against existing signals.
"""
content_type = get_content_type_for_name(bottle.request.json["content_type"])
signal_type = get_signal_types_by_name()[bottle.request.json["signal_type"]]
signal_value = bottle.request.json["signal_value"]
return bank_ops.add_detached_bank_member_signal(
banks_table=table_manager,
bank_id=bank_id,
content_type=content_type,
signal_type=signal_type,
signal_value=signal_value,
)
# Miscellaneous
@bank_api.post("/get-media-upload-url")
def get_media_upload_url(media_type=None):
"""
Get a presigned S3 url that can be used by the client to PUT an object.
Request Payload must be json with the following attributes:
`media_type` must be something like ['image/gif', 'image/png', 'application/zip']
`extension` must be a period followed by file extension. eg. `.mp4`, `.jpg`
"""
extension = bottle.request.json.get("extension")
media_type = bottle.request.json.get("media_type")
if (not extension) or extension[0] != ".":
bottle.abort(400, "extension must start with a period. eg. '.mp4'")
id = str(uuid.uuid4())
today_fragment = datetime.now().isoformat("|").split("|")[0] # eg. 2019-09-12
s3_key = f"bank-media/{media_type}/{today_fragment}/{id}{extension}"
return {
"storage_bucket": bank_user_media_bucket,
"storage_key": s3_key,
"upload_url": create_presigned_put_url(
bucket_name=bank_user_media_bucket,
key=s3_key,
file_type=media_type,
expiration=3600,
),
}
@bank_api.get("/get-member/<bank_member_id>", apply=[jsoninator])
def get_member(bank_member_id=None) -> PreviewableBankMemberWithSignals:
"""
Get a bank member with signals...
"""
member = table_manager.get_bank_member(bank_member_id=bank_member_id)
signals = table_manager.get_signals_for_bank_member(
bank_member_id=bank_member_id
)
return PreviewableBankMemberWithSignals(
**asdict(with_preview_url(member)), signals=signals
)
@bank_api.post("/update-bank-member/<bank_member_id>", apply=[jsoninator])
def update_bank_member(bank_member_id=None) -> Bank:
"""
Update notes and tags for a bank_member_id.
"""
return table_manager.update_bank_member(
bank_member_id=bank_member_id,
notes=bottle.request.json["notes"],
bank_member_tags=set(bottle.request.json["bank_member_tags"]),
)
@bank_api.post("/remove-bank-member/<bank_member_id>")
def remove_bank_member(bank_member_id: str):
"""
Remove bank member signals from the processing index and mark
bank_member as is_removed=True.
Returns empty json object.
"""
bank_ops.remove_bank_member(
banks_table=table_manager,
bank_member_id=bank_member_id,
)
return {}
return bank_api