hasher-matcher-actioner/hmalib/common/models/bank.py (487 lines of code) (raw):

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved import typing as t from datetime import datetime from dataclasses import asdict, dataclass, field from enum import Enum import uuid from boto3.dynamodb.conditions import Key from mypy_boto3_dynamodb.service_resource import Table from threatexchange.content_type.content_base import ContentType from threatexchange.content_type.meta import ( get_content_type_for_name, get_signal_types_by_name, ) from threatexchange.signal_type.signal_base import SignalType from hmalib.common.models.models_base import ( DynamoDBCursorKey, DynamoDBItem, PaginatedResponse, ) """ # DynamoDB Table HMABanks ## Models Stores banks and bank-members & bank-member signals. BankMemberSignals are signals extracted from the content in the bank member object. However, <signal_id> is unique for a signal_type, signal_value tuple. This is enforced by the 4th type of entry in the default table index. ## Indices 1. Default Table Index Item | PK | SK ----------------------|---------------------------|------------------------------ Bank | #bank_object | <bank_id> BankMember | <bank_id>#<content_type> | member#<bank_member_id> BankMemberSignal | <bank_member_id> | signal#<signal_id> <signal_id> | signal_type#<signal_type> | signal_value#<signal_value> Bank objects are all stored under the same, static partition key because all bank information is expected to be less than 10GB. Also, when a bank is deleted, it is removed from the (2) BankNameIndex. To be able to find it, there must be another way. That is provided by the "known" PK of #bank_object BankMembers have a PK of <bank_id>#<content_type>. This allows easy querying for pages of members for a specific content_type. At this point, I do not see need for querying a bank_member without knowing its content_type. Should that need arise (and it might soon), we can use override MemberToSignalIndex below with a static SK: MemberToSignalIndex eg. PK=<bank_member_id>, SK=#member_object 2. BankNameIndex A sparse index. Only contains BankObjects. Used to check if bank_name is repeated. Because it is a sparse index, scans are cheap too. Project all attributes. Cheap and useful if looking up by name. PK SK Item <bank_name> <bank_id> ALL(BankObject) 3. MemberToSignalIndex Use for getting signals for a member. Sparse Index. PK SK Item <bank_member_id> <bank_member_signal_id> KEYS_ONLY(BankMemberSignal) 4. SignalIndex Use to get a signal by its ID. Note, there can be multiple entries if the same signal is emitted by multiple banks. PK SK Item <bank_member_signal_id> <bank_id> KEYS_ONLY(BankMemberSignal) 5. BankMemberSignalCursorIndex Contains BankMemberSignal Objects in the order they were updated. This can be used by any system that needs to process bank member signals in order. eg. indexer, TBD-component that can write a bank to threatexchange. PK SK Item <signal_type> <updated_at>+<some-portion-of-signal-id> ALL(BankMemberSignal) """ @dataclass class Bank(DynamoDBItem): """ Describes a bank object. A bank is an aggregate of bank-members. """ BANK_OBJECT_PARTITION_KEY = "#bank_object" bank_id: str # generated using uuid bank_name: str bank_description: str # Is used actively for matching. Think of as kill switch for the bank. is_active: bool created_at: datetime # Gets changed on updates to the bank object, not its members. updated_at: datetime bank_tags: t.Set[str] = field(default_factory=set) def to_dynamodb_item(self) -> t.Dict: return { # Main Index "PK": self.BANK_OBJECT_PARTITION_KEY, "SK": self.bank_id, # GSI: BankNameIndex "BankNameIndex-BankName": self.bank_name, "BankNameIndex-BankId": self.bank_id, # Attributes "BankId": self.bank_id, "BankName": self.bank_name, "BankDescription": self.bank_description, "CreatedAt": self.created_at.isoformat(), "UpdatedAt": self.updated_at.isoformat(), "IsActive": self.is_active, "BankTags": self.set_to_dynamodb_attribute(self.bank_tags), } @classmethod def from_dynamodb_item(cls, item: t.Dict) -> "Bank": return cls( bank_id=item["BankId"], bank_name=item["BankName"], bank_description=item["BankDescription"], created_at=datetime.fromisoformat(item["CreatedAt"]), updated_at=datetime.fromisoformat(item["UpdatedAt"]), # is_active is True by default. is_active=item.get("IsActive", True), # tags default to empty set bank_tags=cls.dynamodb_attribute_to_set(item.get("BankTags", set())), ) def to_json(self) -> t.Dict: """Used in APIs.""" result = asdict(self) result.update( created_at=self.created_at.isoformat(), updated_at=self.updated_at.isoformat(), bank_tags=list(self.bank_tags), ) return result @dataclass class BankMember(DynamoDBItem): """ Describes a bank member. A bank member is a piece of content. eg. text, video, photo that a partner wants to match against. A BankMember can also be virtual. Which means the actual media is unavailable. This could happen if the bank is sycned with a source that only provides hashes, or it could be that the media has been removed to comply with retention policies. Since virtual could be a loaded term, we use the more explicit and accurate `is_media_unavailable attribute`. """ BANK_MEMBER_ID_PREFIX = "member#" BANK_MEMBER_ID_INDEX = "BankMemberIdIndex" BANK_MEMBER_ID_INDEX_BANK_MEMBER_ID = f"{BANK_MEMBER_ID_INDEX}-BankMemberId" bank_id: str bank_member_id: str content_type: t.Type[ContentType] # When storing media files, store the bucket and the key. If files are # deleted because of legal / retention policies, this indicator will stay # as-is even if the actual s3 object is deleted. storage_bucket: t.Optional[str] storage_key: t.Optional[str] # In case we are storing the content directly in dynamodb. raw_content: t.Optional[str] notes: str created_at: datetime updated_at: datetime is_removed: bool = field(default=False) is_media_unavailable: bool = field(default=False) bank_member_tags: t.Set[str] = field(default_factory=set) @classmethod def get_pk(cls, bank_id: str, content_type: t.Type[ContentType]): return f"{bank_id}#{content_type.get_name()}" @classmethod def get_sk(cls, bank_member_id: str): return f"{cls.BANK_MEMBER_ID_PREFIX}{bank_member_id}" def to_dynamodb_item(self) -> t.Dict: return { # Main Index "PK": self.get_pk(self.bank_id, self.content_type), "SK": self.get_sk(self.bank_member_id), # BankMemberId Index self.BANK_MEMBER_ID_INDEX_BANK_MEMBER_ID: self.bank_member_id, # Attributes "BankId": self.bank_id, "BankMemberId": self.bank_member_id, "ContentType": self.content_type.get_name(), "StorageBucket": self.storage_bucket, "StorageKey": self.storage_key, "RawContent": self.raw_content, "Notes": self.notes, "CreatedAt": self.created_at.isoformat(), "UpdatedAt": self.updated_at.isoformat(), "IsRemoved": self.is_removed, "IsMediaUnavailable": self.is_media_unavailable, "BankMemberTags": self.set_to_dynamodb_attribute(self.bank_member_tags), } @classmethod def from_dynamodb_item(cls, item: t.Dict) -> "BankMember": return cls( bank_id=item["BankId"], bank_member_id=item["BankMemberId"], content_type=get_content_type_for_name(item["ContentType"]), storage_bucket=item["StorageBucket"], storage_key=item["StorageKey"], raw_content=item["RawContent"], notes=item["Notes"], created_at=datetime.fromisoformat(item["CreatedAt"]), updated_at=datetime.fromisoformat(item["UpdatedAt"]), is_removed=item["IsRemoved"], is_media_unavailable=item["IsMediaUnavailable"], # tags default to empty set bank_member_tags=cls.dynamodb_attribute_to_set( item.get("BankMemberTags", set()) ), ) def to_json(self) -> t.Dict: """Used in APIs.""" result = asdict(self) result.update( content_type=self.content_type.get_name(), created_at=self.created_at.isoformat(), updated_at=self.updated_at.isoformat(), bank_member_tags=list(self.bank_member_tags), ) return result @dataclass class BankMemberSignal(DynamoDBItem): """ Describes a signal extracted from a bank member. """ bank_id: str bank_member_id: str signal_id: str signal_type: t.Type[SignalType] signal_value: str updated_at: datetime BANK_MEMBER_SIGNAL_CURSOR_INDEX = "BankMemberSignalCursorIndex" BANK_MEMBER_SIGNAL_CURSOR_INDEX_SIGNAL_TYPE = ( f"{BANK_MEMBER_SIGNAL_CURSOR_INDEX}-SignalType" ) BANK_MEMBER_SIGNAL_CURSOR_INDEX_CHRONO_KEY = ( f"{BANK_MEMBER_SIGNAL_CURSOR_INDEX}-ChronoKey" ) # How many keys of the signal id to use to de-duplicate the chronological # ordering key? CHRONO_KEY_SIGNAL_ID_FRAGMENT_SIZE = 12 @classmethod def get_pk(cls, bank_member_id): return bank_member_id @classmethod def get_sk(cls, signal_id): return f"signal#{signal_id}" @classmethod def get_chrono_key(cls, updated_at: datetime, signal_id: str): return f"{updated_at.isoformat()}:{signal_id[:cls.CHRONO_KEY_SIGNAL_ID_FRAGMENT_SIZE]}" def to_dynamodb_item(self) -> t.Dict: item = { # Main Index "PK": self.get_pk(bank_member_id=self.bank_member_id), "SK": self.get_sk(signal_id=self.signal_id), # Attributes "BankId": self.bank_id, "BankMemberId": self.bank_member_id, "SignalId": self.signal_id, "SignalType": self.signal_type.get_name(), "SignalValue": self.signal_value, "UpdatedAt": self.updated_at.isoformat(), self.BANK_MEMBER_SIGNAL_CURSOR_INDEX_SIGNAL_TYPE: self.signal_type.get_name(), self.BANK_MEMBER_SIGNAL_CURSOR_INDEX_CHRONO_KEY: self.get_chrono_key( self.updated_at, self.signal_id ), } return item @classmethod def from_dynamodb_item(cls, item: t.Dict) -> "BankMemberSignal": return cls( bank_id=item["BankId"], bank_member_id=item["BankMemberId"], signal_id=item["SignalId"], signal_type=get_signal_types_by_name()[item["SignalType"]], signal_value=item["SignalValue"], updated_at=datetime.fromisoformat(item["UpdatedAt"]), ) def to_json(self) -> t.Dict: """Used in APIs.""" result = asdict(self) result.update( signal_type=self.signal_type.get_name(), updated_at=self.updated_at.isoformat(), ) return result @dataclass class BankedSignalEntry(DynamoDBItem): """ Enforces uniqueness for a signal_type and signal_vlaue. """ signal_type: t.Type[SignalType] signal_value: str signal_id: str @classmethod def get_pk(cls, signal_type: t.Type[SignalType]) -> str: return f"signal_type#{signal_type.get_name()}" @classmethod def get_sk(self, signal_value: str) -> str: return f"signal_value#{signal_value}" def to_dynamodb_item(self) -> t.Dict: # Raise an error so that write_to_table() fails. We never want to do that. raise Exception("Do not write BankedSignalEntry to DDB directly!") @classmethod def get_unique( cls, table: Table, signal_type: t.Type[SignalType], signal_value: str ) -> "BankedSignalEntry": """ Write to the table if PK / SK does not exist. In either case (exists, not exists), return the current unique entry. This is a special use-case for BankedSignalEntry. If this is useful to other models, we can move it to a mixin or to dynamodb item. If trying to generify, note how the update_item query needs a custom update query based on what you are trying to write. Generifying may be harder than it seems. """ result = table.update_item( Key={ "PK": cls.get_pk(signal_type), "SK": cls.get_sk(signal_value), }, UpdateExpression="SET SignalId = if_not_exists(SignalId, :signal_id), SignalType = :signal_type, SignalValue = :signal_value", ExpressionAttributeValues={ # Note we are generating a new uuid even though we don't always # expect it to get written. AFAIK, uuids are inexhaustible, and # generation performance is good enough to not worry about it. ":signal_id": str(uuid.uuid4()), ":signal_type": signal_type.get_name(), ":signal_value": signal_value, }, ReturnValues="ALL_NEW", ).get("Attributes") assert result is not None return BankedSignalEntry( signal_type=get_signal_types_by_name()[result["SignalType"]], signal_value=t.cast(str, result["SignalValue"]), signal_id=t.cast(str, result["SignalId"]), ) class BanksTable: """ Provides query + update methods on the entire table. This is a departure from the norm in models from the content, pipeline and signals modules. There, we provide query methods in the model itself. But, here we're trying a single 'manager' for all classes of items in the table. """ def __init__(self, table: Table): self._table = table def create_bank( self, bank_name: str, bank_description: str, is_active: bool = False, bank_tags: t.Set[str] = set(), ) -> Bank: new_bank_id = str(uuid.uuid4()) now = datetime.now() bank = Bank( bank_id=new_bank_id, bank_name=bank_name, bank_description=bank_description, created_at=now, updated_at=now, is_active=is_active, bank_tags=bank_tags, ) bank.write_to_table(table=self._table) return bank def get_bank(self, bank_id: str) -> Bank: return Bank.from_dynamodb_item( self._table.get_item( Key={"SK": bank_id, "PK": Bank.BANK_OBJECT_PARTITION_KEY} )["Item"] ) def get_all_banks(self) -> t.List[Bank]: return [ Bank.from_dynamodb_item(item) for item in self._table.scan(IndexName="BankNameIndex")["Items"] ] def update_bank( self, bank_id: str, bank_name: t.Optional[str], bank_description: t.Optional[str], is_active: t.Optional[bool], bank_tags: t.Optional[t.Set[str]] = None, ) -> Bank: bank = Bank.from_dynamodb_item( self._table.get_item( Key={"SK": bank_id, "PK": Bank.BANK_OBJECT_PARTITION_KEY} )["Item"] ) if bank_name: bank.bank_name = bank_name if bank_description: bank.bank_description = bank_description if is_active != None: bank.is_active = bool(is_active) if bank_tags != None: bank.bank_tags = t.cast(t.Set[str], bank_tags) if bank_name or bank_description or (is_active != None) or bank_tags != None: bank.updated_at = datetime.now() bank.write_to_table(table=self._table) return bank def get_all_bank_members_page( self, bank_id: str, content_type=t.Type[ContentType], exclusive_start_key: t.Optional[DynamoDBCursorKey] = None, ) -> PaginatedResponse[BankMember]: PAGE_SIZE = 100 expected_pk = BankMember.get_pk(bank_id=bank_id, content_type=content_type) if not exclusive_start_key: result = self._table.query( ScanIndexForward=False, KeyConditionExpression=Key("PK").eq(expected_pk), FilterExpression=Key("IsRemoved").eq(False), Limit=PAGE_SIZE, ) else: result = self._table.query( ScanIndexForward=False, KeyConditionExpression=Key("PK").eq(expected_pk), FilterExpression=Key("IsRemoved").eq(False), ExclusiveStartKey=exclusive_start_key, Limit=PAGE_SIZE, ) return PaginatedResponse( t.cast(DynamoDBCursorKey, result.get("LastEvaluatedKey", None)), [BankMember.from_dynamodb_item(item) for item in result["Items"]], ) def add_bank_member( self, bank_id: str, content_type: t.Type[ContentType], storage_bucket: t.Optional[str], storage_key: t.Optional[str], raw_content: t.Optional[str], notes: str, is_media_unavailable: bool = False, bank_member_tags: t.Set[str] = set(), ) -> BankMember: """ Adds a member to the bank. DOES NOT enforce retroaction. DOES NOT extract signals. Merely a facade to the storage layer. Additional co-ordination (hashing, retroactioning, index updates) should happen via hmalib.banks.bank_operations module. """ new_member_id = str(uuid.uuid4()) now = datetime.now() bank_member = BankMember( bank_id=bank_id, bank_member_id=new_member_id, content_type=content_type, storage_bucket=storage_bucket, storage_key=storage_key, raw_content=raw_content, notes=notes, created_at=now, updated_at=now, is_media_unavailable=is_media_unavailable, bank_member_tags=bank_member_tags, ) bank_member.write_to_table(self._table) return bank_member def update_bank_member( self, bank_member_id: str, notes: str, bank_member_tags: t.Set[str] ): """ Updates the notes and tags for a bank member identified by bank_member_id. """ bank_member_keys = self._table.query( IndexName=BankMember.BANK_MEMBER_ID_INDEX, KeyConditionExpression=Key( BankMember.BANK_MEMBER_ID_INDEX_BANK_MEMBER_ID ).eq(bank_member_id), )["Items"][0] bank_member = BankMember.from_dynamodb_item( self._table.get_item( Key={"SK": bank_member_keys["SK"], "PK": bank_member_keys["PK"]} )["Item"] ) bank_member.notes = notes bank_member.bank_member_tags = bank_member_tags bank_member.write_to_table(self._table) return bank_member def remove_bank_member(self, bank_member_id: str): """ Removes the bank member and associated signals from the bank. Merely marks as removed, does not physically delete from the store. DOES NOT stop matching until index is updated. DOES NOT undo any actions already taken. """ bank_member_keys = self._table.query( IndexName=BankMember.BANK_MEMBER_ID_INDEX, KeyConditionExpression=Key( BankMember.BANK_MEMBER_ID_INDEX_BANK_MEMBER_ID ).eq(bank_member_id), )["Items"][0] bank_member = BankMember.from_dynamodb_item( self._table.get_item( Key={"SK": bank_member_keys["SK"], "PK": bank_member_keys["PK"]} )["Item"] ) bank_member.is_removed = True bank_member.write_to_table(self._table) def add_bank_member_signal( self, bank_id: str, bank_member_id: str, signal_type: t.Type[SignalType], signal_value: str, ) -> BankMemberSignal: """ Adds a BankMemberSignal entry. First, identifies if a signal for the corresponding (type, value) tuple exists, if so, reuses it, it not, creates a new one. Returns a BankMemberSignal object. Clients **should not** care whether this is a new signal_id or not. This check is being done here because signal uniqueness is enforced by the same table. If this were being done in a different table/store, we could be doing the check at a different layer eg. hmalib.banks.bank_operations. """ # First, we get a unique signal_id! signal_id = BankedSignalEntry.get_unique( self._table, signal_type=signal_type, signal_value=signal_value ).signal_id # Next, we create the bank member signal member_signal = BankMemberSignal( bank_id=bank_id, bank_member_id=bank_member_id, signal_id=signal_id, signal_type=signal_type, signal_value=signal_value, updated_at=datetime.now(), ) member_signal.write_to_table(self._table) return member_signal def add_detached_bank_member_signal( self, bank_id: str, content_type: t.Type[ContentType], signal_type: t.Type[SignalType], signal_value: str, ) -> BankMemberSignal: """ Adds a BankMemberSignal without a needing a related BankMember. Pretty much the same as add_bank_member_signal otherwise. Creates a BankMember with a is_media_unavailable=True. """ bank_member = self.add_bank_member( bank_id=bank_id, content_type=content_type, storage_bucket=None, storage_key=None, raw_content=None, notes="", is_media_unavailable=True, ) return self.add_bank_member_signal( bank_id=bank_id, bank_member_id=bank_member.bank_member_id, signal_type=signal_type, signal_value=signal_value, ) def remove_bank_member_signals_to_process(self, bank_member_id: str): """ For a bank_member, remove all signals from the BankMemberSignalCursorIndex on this table. All systems that want to "do" something with bank_member_signals use this index. eg. building indexes, syncing signals to another hash_exchange. """ for signal in self.get_signals_for_bank_member(bank_member_id=bank_member_id): self._table.update_item( Key={ "PK": BankMemberSignal.get_pk(bank_member_id=bank_member_id), "SK": BankMemberSignal.get_sk(signal.signal_id), }, UpdateExpression=f"SET UpdatedAt = :updated_at REMOVE #gsi_pk, #gsi_sk", ExpressionAttributeNames={ "#gsi_pk": BankMemberSignal.BANK_MEMBER_SIGNAL_CURSOR_INDEX_SIGNAL_TYPE, "#gsi_sk": BankMemberSignal.BANK_MEMBER_SIGNAL_CURSOR_INDEX_CHRONO_KEY, }, ExpressionAttributeValues={":updated_at": datetime.now().isoformat()}, ) def get_bank_member_signals_to_process_page( self, signal_type: t.Type[SignalType], exclusive_start_key: t.Optional[DynamoDBCursorKey] = None, limit: int = 100, ) -> PaginatedResponse[BankMemberSignal]: if not exclusive_start_key: result = self._table.query( IndexName=BankMemberSignal.BANK_MEMBER_SIGNAL_CURSOR_INDEX, ScanIndexForward=True, KeyConditionExpression=Key( BankMemberSignal.BANK_MEMBER_SIGNAL_CURSOR_INDEX_SIGNAL_TYPE ).eq(signal_type.get_name()), Limit=limit, ) else: result = self._table.query( IndexName=BankMemberSignal.BANK_MEMBER_SIGNAL_CURSOR_INDEX, ScanIndexForward=True, KeyConditionExpression=Key( BankMemberSignal.BANK_MEMBER_SIGNAL_CURSOR_INDEX_SIGNAL_TYPE ).eq(signal_type.get_name()), ExclusiveStartKey=exclusive_start_key, Limit=limit, ) return PaginatedResponse( t.cast(DynamoDBCursorKey, result.get("LastEvaluatedKey", None)), [BankMemberSignal.from_dynamodb_item(item) for item in result["Items"]], ) def get_signals_for_bank_member( self, bank_member_id: str ) -> t.List[BankMemberSignal]: return [ BankMemberSignal.from_dynamodb_item(item) for item in self._table.query( KeyConditionExpression=Key("PK").eq( BankMemberSignal.get_pk(bank_member_id=bank_member_id) ) )["Items"] ] def get_bank_member(self, bank_member_id: str) -> BankMember: member_keys = self._table.query( IndexName=BankMember.BANK_MEMBER_ID_INDEX, KeyConditionExpression=Key( BankMember.BANK_MEMBER_ID_INDEX_BANK_MEMBER_ID ).eq(bank_member_id), )["Items"][0] return BankMember.from_dynamodb_item( self._table.get_item( Key={"PK": member_keys["PK"], "SK": member_keys["SK"]} )["Item"] ) def get_bank_member_signal_from_id( self, signal_id: str ) -> t.List[BankMemberSignal]: """ Hacky (not efficient): we need to add an index entry or object to change this look up to a query (not a scan) before we can support very large banks. This currently does not provide bank name, bank tags, or bank members tags to avoid yet another look up. (we should find a way to avoid the scan before adding such options) """ return [ BankMemberSignal.from_dynamodb_item(item) for item in self._table.scan( IndexName="BankMemberSignalCursorIndex", FilterExpression=Key("SK").eq(BankMemberSignal.get_sk(signal_id)), )["Items"] ]