python-threatexchange/threatexchange/fetcher/fetch_api.py (83 lines of code) (raw):

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved """ The fetcher is the component that talks to external APIs to get and put signals @see SignalExchangeAPI """ import typing as t from threatexchange import common from threatexchange.fetcher.collab_config import CollaborationConfigBase from threatexchange.signal_type.signal_base import SignalType from threatexchange.fetcher import fetch_state as state # TODO t.Generic[TFetchDelta, TFetchedSignalData, TCollabConfig] # In order to make it easier to track the expected extensions for an API class SignalExchangeAPI: """ APIs to get and maybe put signals. Fetchers ideally can checkpoint their progress, so that they can tail updates. While this interface is primarily intended for connecting with externally hosted servers, it might be useful to write adopters for certain formats of local files, which could be valuable for testing. Is assumed that fetched signals have some metadata attached to them, which is unique to that API. Additionally, it a assumed that there might be multiple contributors (owners) to signals inside of an API. An instance of this class can retain state (caching connecting, etc) as needed. Methods with an implementation are optional, but may rely on = On Owner IDs = Some APIs may not have any concept of owner ID (because the owner is the API itself). In that case, it's suggested to use 0 for all IDs. If the you own the API, it may also make sense to have 0 be the return of get_own_owner_id, which is used by matching to tell if a signal be considered "confirmed" or not. If your API does support the concept of owners, make sure to implement resolve_owner and get_own_owner_id """ @classmethod def get_name(cls) -> str: """ A simple string name unique to SignalExchangeAPIs in use. It should be one lowercase_with_underscores word. This shouldn't be changed once comitted, or you may break naive storage solutions (like the one in the CLI) which stores fetched data by (SignalExchangeAPI.name(), collab_name). """ name = cls.__name__ for suffix in ("API", "Exchange"): if name.endswith(suffix): name = name[: -len(suffix)] return common.class_name_to_human_name(name, "Signal") @classmethod def get_checkpoint_cls(cls) -> t.Type[state.FetchCheckpointBase]: """Returns the dataclass used to control checkpoint for this API""" return state.FetchCheckpointBase # Default = no checkpoints @classmethod def get_record_cls(cls) -> t.Type[state.FetchedSignalMetadata]: """Returns the dataclass used to store records for this API""" return state.FetchedSignalMetadata # Default = no metadata @classmethod def get_config_class(cls) -> t.Type[CollaborationConfigBase]: """Returns the dataclass used to store records for this API""" return CollaborationConfigBase def resolve_owner(self, id: int) -> str: """ Convert an owner ID into a human readable name (if available). If empty string is returned, a placeholder will be used instead. """ return "" def get_own_owner_id(self, collab: CollaborationConfigBase) -> int: """ Return the owner ID of this caller. Opinions with that ID are "ours". SignalOpinions returned by fetch() where the owner id is the return of get_own_owner_id() are assumed to be owned by you, which can result in some additional metadata being added to matches, for example in the `match` command of the CLI. A default implementation is provided that is assumed to not match any real owner ID. """ return -1 def fetch_once( self, supported_signal_types: t.List[t.Type[SignalType]], collab: CollaborationConfigBase, # None if fetching for the first time, # otherwise the previous FetchDelta returned checkpoint: t.Optional[state.FetchCheckpointBase], ) -> state.FetchDelta: """ Call out to external resources, pulling down one "batch" of content. Many APIs are a sequence of events: (creates/updates, deletions) In that case, it's important the these events are strictly ordered. I.e. if the sequence is create => delete, if the sequence is reversed to delete => create, the end result is a stored record, when the expected is a deleted one. """ raise NotImplementedError def report_seen( self, collab: CollaborationConfigBase, s_type: SignalType, signal: str, metadata: state.FetchedStateStoreBase, ) -> None: """ Report that you observed this signal. This is an optional API, and places that use it should catch the NotImplementError. """ raise NotImplementedError def report_opinion( self, collab: CollaborationConfigBase, s_type: t.Type[SignalType], signal: str, opinion: state.SignalOpinion, ) -> None: """ Weigh in on a signal for this collaboration. Most implementations will want a full replacement specialization, but this allows a common interface for all uploads for the simplest usecases. This is an optional API, and places that use it should catch the NotImplementError. """ raise NotImplementedError def report_true_positive( self, collab: CollaborationConfigBase, s_type: t.Type[SignalType], signal: str, metadata: state.FetchedSignalMetadata, ) -> None: """ Report that a previously seen signal was a true positive. This is an optional API, and places that use it should catch the NotImplementError. """ self.report_opinion( collab, s_type, signal, state.SignalOpinion( owner=self.get_own_owner_id(collab), category=state.SignalOpinionCategory.TRUE_POSITIVE, tags=set(), ), ) def report_false_positive( self, collab: CollaborationConfigBase, s_type: t.Type[SignalType], signal: str, metadata: state.FetchedSignalMetadata, ) -> None: """ Report that a previously seen signal is a false positive. This is an optional API, and places that use it should catch the NotImplementError. """ self.report_opinion( collab, s_type, signal, state.SignalOpinion( owner=self.get_own_owner_id(collab), category=state.SignalOpinionCategory.FALSE_POSITIVE, tags=set(), ), )