python-threatexchange/threatexchange/fetcher/apis/file_api.py (60 lines of code) (raw):

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved """ The fetcher is the component that talks to external APIs to get and put signals @see SignalExchangeAPI """ import os import typing as t from dataclasses import dataclass from pathlib import Path from threatexchange.fetcher.simple.state import ( SimpleFetchDelta, ) from threatexchange.fetcher import fetch_state as state from threatexchange.fetcher.fetch_api import SignalExchangeAPI from threatexchange.fetcher.collab_config import ( CollaborationConfigBase, DefaultsForCollabConfigBase, ) from threatexchange.signal_type.signal_base import SignalType @dataclass class FileCollaborationConfig(CollaborationConfigBase, DefaultsForCollabConfigBase): filename: str signal_type: t.Optional[str] class LocalFileSignalExchangeAPI(SignalExchangeAPI): """ Read simple signal files off the local disk. """ @classmethod def get_config_class(cls) -> t.Type[FileCollaborationConfig]: return FileCollaborationConfig def fetch_once( # type: ignore[override] # fix with generics on base self, _supported_signal_types: t.List[t.Type[SignalType]], collab: FileCollaborationConfig, _checkpoint: t.Optional[state.FetchCheckpointBase], ) -> state.FetchDelta: """Fetch the whole file""" path = Path(collab.filename) assert path.exists(), f"No such file {path}" assert path.is_file(), f"{path} is not a file (is it a dir?)" # TODO - Support things other than just one item per line with path.open("r") as f: lines = f.readlines() updates = {} for line in lines: signal_type = collab.signal_type signal = line.strip() if signal_type is None: signal_type, _, signal = signal.partition(" ") if signal_type and signal: updates[signal_type, signal] = state.FetchedSignalMetadata() return SimpleFetchDelta(updates, state.FetchCheckpointBase(), done=True) def report_opinion( # type: ignore[override] # fix with generics on base self, collab: FileCollaborationConfig, s_type: t.Type[SignalType], signal: str, opinion: state.SignalOpinion, ) -> None: if opinion.category != state.SignalOpinionCategory.TRUE_POSITIVE: raise NotImplementedError if opinion.tags: raise NotImplementedError path = Path(collab.filename) with path.open("rb") as f: f.seek(-1, os.SEEK_END) has_newline = f.read1(1) == b"\n" # Appending will overwrite previous ones, and compaction is for scrubs with path.open("wa") as f: nl = "" if has_newline else "\n" f.write(f"{nl}{s_type.get_name()} {signal}\n")