hasher-matcher-actioner/hmalib/common/models/signal.py (140 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
r"""Signal Models
The structure of a signal and the data we want to retain on it is
super-dependent on the source of the signal. Trying to make a "base" signal
model can be futile because of the disparity of attributes and actions that
different sources can have.
Instead of abstracting out, for signals, we are creating separate models for
each source with their own feature evolution. This reality must also manifest on
the UI. eg. The actions you can take for threatexchange signals are distinct
from the actions you can take on your local bank's signals.
But, since the id-universes of the signals are distinct, how do we precisely
refer to a signal when using a signal id? This is a little complex because a
signal is uniquely defined by two of its attributes: `signal_source` and
`signal_id`.
So, every signal class needs to define a property `signal_source` which will be
used in indexing, queries etc.
When we add more signal sources, we'll understand the complexity and identify a
clean path for supporting multiple signal-sources. For now, only support
threatexchange.
"""
import datetime
from enum import Enum
import typing as t
from dataclasses import dataclass, field, asdict
from mypy_boto3_dynamodb.service_resource import Table
from boto3.dynamodb.conditions import Attr, Key, And
from botocore.exceptions import ClientError
from threatexchange.content_type.meta import get_signal_types_by_name
from threatexchange.signal_type.signal_base import SignalType
from hmalib.common.models.models_base import DynamoDBItem
class PendingThreatExchangeOpinionChange(Enum):
MARK_TRUE_POSITIVE = "mark_true_positive"
MARK_FALSE_POSITIVE = "mark_false_positive"
REMOVE_OPINION = "remove_opinion"
NONE = "none"
@dataclass
class ThreatExchangeSignalMetadata(DynamoDBItem):
"""
This object is designed to be an ~lookaside on some of the values used by
MatchRecord for easier and more consistent updating by the syncer and UI.
We only write these objects when we match against a signal from
threatexchange. Updates can happen when:
a. a user registers an opinion against this signal.
b. the signal's attributes are updated in threatexchange.
User's registration of an opinion is a multi-stage process. The opinion is
first recorded on this object as a 'pending' opinion directly by the UI.
Once the opinion is written to threatexchange and synced back, the 'pending'
opinion is cleared.
As clarified in the module's doc, it is okay to expose the threatexchange
specific attribute 'privacy_group_id' because the capabilities that an
exchange offers is going to be varied enough that abstracting might be
impossible.
Storage
---
PK: s#{source_short_code}#{signal_id}
SK: pg#{privacy_group_id}
Where
{source_short_code} = "te"
and signal_id = {indicator_id} in threatexchange.
and privacy_group_id is privacy_group_id in threatexchange.
"""
SIGNAL_SOURCE_SHORTCODE = "te"
PRIVACY_GROUP_PREFIX = "pg#"
signal_id: str
privacy_group_id: str
updated_at: datetime.datetime
signal_type: t.Type[SignalType]
signal_hash: str
tags: t.List[str] = field(default_factory=list)
pending_opinion_change: PendingThreatExchangeOpinionChange = (
PendingThreatExchangeOpinionChange.NONE
)
PROJECTION_EXPRESSION = "PK, SignalHash, SignalSource, SignalType, PrivacyGroup, Tags, PendingThreatExchangeOpinionChange, UpdatedAt"
@classmethod
def get_sort_key(self, privacy_group_id: str) -> str:
return f"pg#{privacy_group_id}"
def to_json(self) -> t.Dict:
"""
Used by '/matches/for-hash' in lambdas/api/matches
"""
result = asdict(self)
result.update(
signal_type=self.signal_type.get_name(),
updated_at=self.updated_at.isoformat(),
pending_opinion_change=self.pending_opinion_change.value,
)
return result
def to_dynamodb_item(self) -> dict:
return {
"PK": self.get_dynamodb_signal_key(
self.SIGNAL_SOURCE_SHORTCODE, self.signal_id
),
"SK": self.get_sort_key(self.privacy_group_id),
"SignalHash": self.signal_hash,
"SignalSource": self.SIGNAL_SOURCE_SHORTCODE,
"UpdatedAt": self.updated_at.isoformat(),
"SignalType": self.signal_type.get_name(),
"PrivacyGroup": self.privacy_group_id,
"Tags": self.tags,
"PendingThreatExchangeOpinionChange": self.pending_opinion_change.value,
}
def update_tags_in_table_if_exists(self, table: Table) -> bool:
return self._update_field_in_table_if_exists(
table,
field_value=self.tags,
field_name="Tags",
)
def update_pending_opinion_change_in_table_if_exists(self, table: Table) -> bool:
return self._update_field_in_table_if_exists(
table,
field_value=self.pending_opinion_change.value,
field_name="PendingThreatExchangeOpinionChange",
)
def _update_field_in_table_if_exists(
self, table: Table, field_value: t.Any, field_name: str
) -> bool:
"""
Only write the field for object in table if the objects with matchig PK and SK already exist
(also updates updated_at).
Returns true if object existed and therefore update was successful otherwise false.
"""
try:
table.update_item(
Key={
"PK": self.get_dynamodb_signal_key(
self.SIGNAL_SOURCE_SHORTCODE, self.signal_id
),
"SK": self.get_sort_key(self.privacy_group_id),
},
# service_resource.Table.update_item's ConditionExpression params is not typed to use its own objects here...
ConditionExpression=And(Attr("PK").exists(), Attr("SK").exists()), # type: ignore
ExpressionAttributeValues={
":f": field_value,
":u": self.updated_at.isoformat(),
},
ExpressionAttributeNames={
"#F": field_name,
"#U": "UpdatedAt",
},
UpdateExpression="SET #F = :f, #U = :u",
)
except ClientError as e:
if e.response["Error"]["Code"] != "ConditionalCheckFailedException":
raise e
return False
return True
@classmethod
def get_from_signal(
cls,
table: Table,
signal_id: t.Union[str, int],
) -> t.List["ThreatExchangeSignalMetadata"]:
"""
Load objects for this signal across all privacy groups. A signal_id,
which maps to indicator_id on threatexchange, can be part of multiple
privacy groups. Opinions are registered on a (privacy_group,
indicator_id) tuple. Not exactly, but kind of.
"""
pk = cls.get_dynamodb_signal_key(cls.SIGNAL_SOURCE_SHORTCODE, signal_id)
items = table.query(
KeyConditionExpression=Key("PK").eq(pk)
& Key("SK").begins_with(cls.PRIVACY_GROUP_PREFIX),
ProjectionExpression=cls.PROJECTION_EXPRESSION,
).get("Items")
return cls._result_items_to_metadata(items or [])
@classmethod
def get_from_signal_and_privacy_group(
cls, table: Table, signal_id: t.Union[str, int], privacy_group_id: str
) -> t.Optional["ThreatExchangeSignalMetadata"]:
"""
Load object for this signal and privacy_group combination.
"""
pk = cls.get_dynamodb_signal_key(cls.SIGNAL_SOURCE_SHORTCODE, signal_id)
sk = cls.get_sort_key(privacy_group_id)
item = table.get_item(Key={"PK": pk, "SK": sk})
return "Item" in item and cls._result_item_to_metadata(item["Item"]) or None
@classmethod
def _result_items_to_metadata(
cls,
items: t.List[t.Dict],
) -> t.List["ThreatExchangeSignalMetadata"]:
return [cls._result_item_to_metadata(item) for item in items]
@classmethod
def _result_item_to_metadata(
cls,
item: t.Dict,
) -> "ThreatExchangeSignalMetadata":
return ThreatExchangeSignalMetadata(
signal_id=cls.remove_signal_key_prefix(item["PK"], item["SignalSource"]),
privacy_group_id=item["PrivacyGroup"],
updated_at=datetime.datetime.fromisoformat(item["UpdatedAt"]),
signal_type=get_signal_types_by_name()[item["SignalType"]],
signal_hash=item["SignalHash"],
tags=item["Tags"],
pending_opinion_change=PendingThreatExchangeOpinionChange(
item.get(
"PendingThreatExchangeOpinionChange",
PendingThreatExchangeOpinionChange.NONE.value,
)
),
)