python-threatexchange/threatexchange/fetcher/apis/stop_ncii_api.py (27 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
SignalExchangeAPI impl StopNCII.org
"""
import time
import typing as t
from dataclasses import dataclass
from threatexchange.fetcher import fetch_state as state
from threatexchange.fetcher.fetch_api import SignalExchangeAPI
from threatexchange.fetcher.collab_config import (
CollaborationConfigBase,
)
from threatexchange.signal_type.signal_base import SignalType
@dataclass
class StopNCIICheckpoint(
state.FetchCheckpointBase,
):
update_time: int
last_fetch_time: int
def is_stale(self) -> bool:
"""Consider stale after 30d of not fetching"""
return time.time() - self.last_fetch_time > 3600 * 24 * 30
def get_progress_timestamp(self) -> t.Optional[int]:
return self.update_time
class StopNCIIAPI(SignalExchangeAPI):
def fetch_once( # type: ignore[override] # fix with generics on base
self,
supported_signal_types: t.List[t.Type[SignalType]],
collab: CollaborationConfigBase,
checkpoint: t.Optional[StopNCIICheckpoint],
) -> state.FetchDelta:
# TODO
raise NotImplementedError("TODO not yet implemented")
# now = int(time.time())
# return SimpleFetchDelta({}, StopNCIICheckpoint(now, now, True))