python-threatexchange/threatexchange/fetcher/apis/fb_threatexchange_api.py (211 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
SignalExchangeAPI impl for Facebook/Meta's ThreatExchange Graph API platform.
https://developers.facebook.com/programs/threatexchange
https://developers.facebook.com/docs/threat-exchange/reference/apis/
"""
import typing as t
import time
from dataclasses import dataclass, field
from threatexchange.fb_threatexchange.threat_updates import ThreatUpdateJSON
from threatexchange.fetcher.simple.state import SimpleFetchDelta
from threatexchange.fb_threatexchange.api import ThreatExchangeAPI, _CursoredResponse
from threatexchange.fetcher import fetch_state as state
from threatexchange.fetcher.fetch_api import SignalExchangeAPI
from threatexchange.fetcher.collab_config import (
CollaborationConfigBase,
DefaultsForCollabConfigBase,
)
from threatexchange.signal_type.signal_base import SignalType
@dataclass
class FBThreatExchangeCollabConfig(
CollaborationConfigBase, DefaultsForCollabConfigBase
):
privacy_group: int
app_token_override: t.Optional[str] = None
@dataclass
class FBThreatExchangeCheckpoint(state.FetchCheckpointBase):
"""
State about the progress of a /threat_updates-backed state.
If a client does not resume tailing the threat_updates endpoint fast enough,
deletion records will be removed, making it impossible to determine which
records should be retained without refetching the entire dataset from scratch.
"""
update_time: int = 0
last_fetch_time: int = field(default_factory=lambda: int(time.time()))
def is_stale(self) -> bool:
"""
The API implementation will retain for 90 days
https://developers.facebook.com/docs/threat-exchange/reference/apis/threat-updates/
"""
return time.time() - self.last_fetch_time > 3600 * 24 * 85 # 85 days
def get_progress_timestamp(self) -> int:
return self.update_time
@dataclass
class FBThreatExchangeOpinion(state.SignalOpinion):
REACTION_DESCRIPTOR_ID: t.ClassVar[int] = -1
descriptor_id: t.Optional[int]
@dataclass
class FBThreatExchangeIndicatorRecord(state.FetchedSignalMetadata):
opinions: t.List[FBThreatExchangeOpinion]
def get_as_opinions( # type: ignore # Why can't mypy tell this is a subclass?
self,
) -> t.List[FBThreatExchangeOpinion]:
return self.opinions
@classmethod
def from_threatexchange_json(
cls, te_json: ThreatUpdateJSON
) -> t.Optional["FBThreatExchangeIndicatorRecord"]:
if te_json.should_delete:
return None
explicit_opinions = {}
implicit_opinions = {}
for td_json in te_json.raw_json["descriptors"]["data"]:
td_id = int(td_json["id"])
owner_id = int(td_json["owner"]["id"])
status = (td_json["status"],)
# added_on = td_json["added_on"]
tags = td_json.get("tags", [])
# This is needed because ThreatExchangeAPI.get_threat_descriptors()
# does a transform, but other locations do not
if isinstance(tags, dict):
tags = sorted(tag["text"] for tag in tags["data"])
category = state.SignalOpinionCategory.WORTH_INVESTIGATING
if status == "MALICIOUS":
category = state.SignalOpinionCategory.TRUE_POSITIVE
elif status == "NON_MALICIOUS":
category = state.SignalOpinionCategory.FALSE_POSITIVE
explicit_opinions[owner_id] = FBThreatExchangeOpinion(
owner_id, category, tags, td_id
)
for reaction in td_json.get("reactions", []):
rxn = reaction["key"]
owner = int(reaction["value"])
if rxn == "HELPFUL":
implicit_opinions[owner] = state.SignalOpinionCategory.TRUE_POSITIVE
elif rxn == "DISAGREE_WITH_TAGS" and owner not in implicit_opinions:
implicit_opinions[
owner
] = state.SignalOpinionCategory.FALSE_POSITIVE
for owner_id, category in implicit_opinions.items():
if owner_id in explicit_opinions:
continue
explicit_opinions[owner_id] = FBThreatExchangeOpinion(
owner_id,
category,
set(),
FBThreatExchangeOpinion.REACTION_DESCRIPTOR_ID,
)
if not explicit_opinions:
# Visibility bug of some kind on TE API :(
return None
return cls(list(explicit_opinions.values()))
@staticmethod
def te_threat_updates_fields() -> t.Tuple[str, ...]:
"""The input to the "field" selector for the API"""
return (
"indicator",
"type",
"last_updated",
"should_delete",
"descriptors{%s}"
% ",".join(
(
"id",
"reactions",
"owner{id}",
"tags",
"status",
)
),
)
class FBThreatExchangeSignalExchangeAPI(SignalExchangeAPI):
def __init__(self, fb_app_token: t.Optional[str] = None) -> None:
self._api = None
if fb_app_token is not None:
self._api = ThreatExchangeAPI(fb_app_token)
self.cursors: t.Dict[str, _CursoredResponse] = {}
@property
def api(self) -> ThreatExchangeAPI:
if self._api is None:
raise Exception("App Developer token not configured.")
return self._api
@classmethod
def get_checkpoint_cls(cls) -> t.Type[state.FetchCheckpointBase]:
return FBThreatExchangeCheckpoint
@classmethod
def get_record_cls(cls) -> t.Type[FBThreatExchangeIndicatorRecord]:
return FBThreatExchangeIndicatorRecord
@classmethod
def get_config_class(cls) -> t.Type[FBThreatExchangeCollabConfig]:
return FBThreatExchangeCollabConfig
def resolve_owner(self, id: int) -> str:
# TODO -This is supported by the API
raise NotImplementedError
def get_own_owner_id( # type: ignore[override] # fix with generics on base
self, collab: FBThreatExchangeCollabConfig
) -> int:
return self.api.app_id
def fetch_once( # type: ignore # fix with generics on base
self,
supported_signal_types: t.List[t.Type[SignalType]],
collab: FBThreatExchangeCollabConfig,
checkpoint: t.Optional[FBThreatExchangeCheckpoint],
) -> state.FetchDelta:
cursor = self.cursors.get(collab.name)
start_time = None if checkpoint is None else checkpoint.update_time
if not cursor:
cursor = self.api.get_threat_updates(
collab.privacy_group,
start_time=start_time,
page_size=500,
fields=ThreatUpdateJSON.te_threat_updates_fields(),
decode_fn=ThreatUpdateJSON,
)
self.cursors[collab.name] = cursor
batch: t.List[ThreatUpdateJSON] = []
highest_time = 0
for update in cursor.next():
# TODO catch errors here
batch.append(update)
# Is supposed to be strictly increasing
highest_time = max(update.time, highest_time)
# TODO - correctly check types
return SimpleFetchDelta(
{
(
u.threat_type,
u.indicator,
): FBThreatExchangeIndicatorRecord.from_threatexchange_json(
u
) # type: ignore # TODO, this is a real type error, but functional for now
for u in batch
},
FBThreatExchangeCheckpoint(highest_time),
done=cursor.done,
)
def report_seen( # type: ignore[override] # fix with generics on base
self,
collab: FBThreatExchangeCollabConfig,
s_type: SignalType,
signal: str,
metadata: state.FetchedStateStoreBase,
) -> None:
# TODO - this is supported by the API
raise NotImplementedError
def report_opinion( # type: ignore[override] # fix with generics on base
self,
collab: FBThreatExchangeCollabConfig,
s_type: t.Type[SignalType],
signal: str,
opinion: state.SignalOpinion,
) -> None:
# TODO - this is supported by the API
raise NotImplementedError
def report_true_positive( # type: ignore[override] # fix with generics on base
self,
collab: FBThreatExchangeCollabConfig,
s_type: t.Type[SignalType],
signal: str,
metadata: state.FetchedSignalMetadata,
) -> None:
# TODO - this is supported by the API
self.report_opinion(
collab,
s_type,
signal,
state.SignalOpinion(
owner=self.get_own_owner_id(collab),
category=state.SignalOpinionCategory.TRUE_POSITIVE,
tags=set(),
),
)
def report_false_positive( # type: ignore[override] # fix with generics on base
self,
collab: FBThreatExchangeCollabConfig,
s_type: t.Type[SignalType],
signal: str,
_metadata: state.FetchedSignalMetadata,
) -> None:
self.report_opinion(
collab,
s_type,
signal,
state.SignalOpinion(
owner=self.get_own_owner_id(collab),
category=state.SignalOpinionCategory.FALSE_POSITIVE,
tags=set(),
),
)