hasher-matcher-actioner/hmalib/common/models/models_base.py (95 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
from dataclasses import dataclass
from decimal import Decimal
import typing as t
from mypy_boto3_dynamodb.service_resource import Table
from botocore.exceptions import ClientError
class DynamoDBItem:
CONTENT_KEY_PREFIX = "c#"
SIGNAL_KEY_PREFIX = "s#"
TYPE_PREFIX = "type#"
SET_PLACEHOLDER_VALUE = "SET_PLACEHOLDER_VALUE"
def write_to_table(self, table: Table):
table.put_item(Item=self.to_dynamodb_item())
def write_to_table_if_not_found(self, table: Table) -> bool:
"""
Write record to DDB if the PK/SK combination does not exist.
Returns:
* True when record was written (did not exist)
* False when record could not be written (PK/SK combo existed)
"""
try:
table.put_item(
Item=self.to_dynamodb_item(),
ConditionExpression="attribute_not_exists(PK) AND attribute_not_exists(SK)",
)
except ClientError as client_error:
# boto3 exception handling https://imgflip.com/i/5f5zfj
if (
client_error.response.get("Error", {"Code", "Unknown"}).get(
"Code", "Unknown"
)
== "ConditionalCheckFailedException"
):
return False
else:
raise client_error
return True
def to_dynamodb_item(self) -> t.Dict:
raise NotImplementedError
@staticmethod
def get_dynamodb_content_key(c_id: str) -> str:
return f"{DynamoDBItem.CONTENT_KEY_PREFIX}{c_id}"
@staticmethod
def get_dynamodb_signal_key(source: str, s_id: t.Union[str, int]) -> str:
return f"{DynamoDBItem.SIGNAL_KEY_PREFIX}{source}#{s_id}"
@staticmethod
def remove_signal_key_prefix(key: str, source: str) -> str:
return key[len(DynamoDBItem.SIGNAL_KEY_PREFIX) + len(source) + 1 :]
@staticmethod
def get_dynamodb_type_key(type: str) -> str:
return f"{DynamoDBItem.TYPE_PREFIX}{type}"
@staticmethod
def remove_content_key_prefix(key: str) -> str:
return key[len(DynamoDBItem.CONTENT_KEY_PREFIX) :]
@classmethod
def set_to_dynamodb_attribute(cls, value: t.Set) -> t.Set:
if not value or len(value) == 0:
return set([cls.SET_PLACEHOLDER_VALUE])
return value
@classmethod
def dynamodb_attribute_to_set(cls, value: t.Set) -> t.Set:
if len(value) == 1:
elem = value.pop()
if elem == cls.SET_PLACEHOLDER_VALUE:
return set()
else:
value.add(elem)
return value
class AWSMessage:
def to_aws_message(self) -> str:
raise NotImplementedError
@classmethod
def from_aws_message(cls, message: str) -> "AWSMessage":
raise NotImplementedError
# DDB's internal LastEvaluatedKey and ExclusiveStartKey both follow this type.
# Naming Struggle: Is this a "real" cursor, dunno, but "DynamoDBKey" is too
# confusable. Needed to add something to distinguish. Using "Cursor" now.
DynamoDBCursorKey = t.NewType(
"DynamoDBCursorKey",
t.Dict[
str,
t.Union[
bytes,
bytearray,
str,
int,
bool,
Decimal,
t.Set[int],
t.Set[Decimal],
t.Set[str],
t.Set[bytes],
t.Set[bytearray],
t.List[t.Any],
t.Dict[str, t.Any],
None,
],
],
)
T = t.TypeVar("T")
@dataclass
class PaginatedResponse(t.Generic[T]):
"""
A generic paginated resopnse container for list of items queried/scanned
from dynamodb.
"""
last_evaluated_key: DynamoDBCursorKey
items: t.List[T]
def has_next_page(self):
"""
If query does not return last_evaluated_key, there are no more results
to return.
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.Pagination.html
"""
return self.last_evaluated_key != None