python-threatexchange/threatexchange/cli/cli_config.py (191 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
Local storage and configuration for the CLI.
The CLI and Hasher-Matcher-Actioner are roughly parallel, but this isn't a
scalable service running on AWS. Instead, we have all of our state in
a file (likely ~/.threatexchange)
"""
from dataclasses import dataclass, field
import sys
import typing as t
import json
import pathlib
import logging
from dacite import WrongTypeError
from threatexchange.fetcher import collab_config
from threatexchange.fetcher.fetch_api import SignalExchangeAPI
from threatexchange.content_type import content_base
from threatexchange.fetcher.fetch_state import FetchedStateStoreBase
from threatexchange.fetcher.apis.static_sample import StaticSampleSignalExchangeAPI
from threatexchange.signal_type import signal_base
from threatexchange.meta import FunctionalityMapping
from threatexchange.cli.cli_state import CliSimpleState, CliIndexStore
from threatexchange.cli import dataclass_json as cli_json
CONFIG_FILENAME = "config.json"
@dataclass
class CLiConfig:
"""A place to store misc configuration for the CLI"""
fb_threatexchange_api_token: t.Optional[str] = None
extensions: t.Set[str] = field(default_factory=set)
class CliState(collab_config.CollaborationConfigStoreBase):
"""
A wrapper around stateful information stored for the CLI.
Everything is just in a single directory (usually ~/.threatexchange).
"""
def __init__(
self,
fetch_types: t.List[t.Union[SignalExchangeAPI, t.Type[SignalExchangeAPI]]],
dir: pathlib.Path,
):
self._dir = dir.expanduser()
self._name_to_ctype = {
ft.get_name(): ft.get_config_class() for ft in fetch_types
}
self._cache: t.Optional[
t.Dict[str, collab_config.CollaborationConfigBase]
] = None
self._init_folders_if_needed()
def _init_folders_if_needed(self):
for d in (self.collab_dir, self.index_dir, self.fetched_state_dir):
if not d.is_dir():
d.mkdir(parents=True)
cfg = self.config_file
if not cfg.is_file():
cfg.write_text("{}")
@property
def collab_dir(self) -> pathlib.Path:
return self._dir / "collab_configs/"
@property
def fetched_state_dir(self) -> pathlib.Path:
return self._dir / "fetched"
@property
def index_dir(self) -> pathlib.Path:
return self._dir / "index/"
@property
def config_file(self) -> pathlib.Path:
return self._dir / "config.json"
def path_for_collab_config(
self, config: collab_config.CollaborationConfigBase
) -> pathlib.Path:
return self.collab_dir / f"{config.name}.json"
def get_persistent_config(self) -> CLiConfig:
return cli_json.dataclass_load_file(
self.config_file, CLiConfig, default=CLiConfig()
)
def update_persistent_config(self, config: CLiConfig):
cli_json.dataclass_dump_file(self.config_file, config)
def dir_for_fetched_state(
self,
api: t.Type[SignalExchangeAPI],
) -> pathlib.Path:
return self.fetched_state_dir / f"{api.get_name()}/"
def get_collab_names_without_loading(self) -> t.List[str]:
if self._cache is not None:
return list(self._cache)
return [str(p) for p in self.collab_dir.glob("*.json")]
def get_all_collabs(self) -> t.List[collab_config.CollaborationConfigBase]:
"""
Get all CollaborationConfigs, already resolved to the correct type
"""
if self._cache is None:
collab_dir = self.collab_dir
ret = []
for f in collab_dir.glob("*.json"):
if not f.is_file():
logging.warning("Ignoring strange file in collab dir: %s", f)
continue
with f.open() as fp:
try:
content = json.load(fp)
except json.JSONDecodeError:
logging.exception("Failed to parse collab config: %s", f)
continue
ctype = None
if isinstance(content, dict):
ctype = self._name_to_ctype.get(content.get("api")) # type: ignore
if ctype is None:
logging.warning("Ignoring collab config of unknown type: %s", f)
continue
try:
config = cli_json.dataclass_load_dict(content, ctype)
ret.append(config)
except WrongTypeError:
logging.exception("Failed to parse collab config: %s", f)
self._cache = {c.name: c for c in ret}
return list(self._cache.values())
def update_collab(self, collab: collab_config.CollaborationConfigBase) -> None:
"""Create or update a collaboration"""
path = self.path_for_collab_config(collab)
cli_json.dataclass_dump_file(path, collab)
def delete_collab(self, collab: collab_config.CollaborationConfigBase) -> None:
"""Delete a collaboration"""
self.path_for_collab_config(collab).unlink(missing_ok=True)
class CLISettings:
"""
A God object for all miscellanious persisted state to make the CLI work
"""
def __init__(
self,
mapping: FunctionalityMapping,
cli_state: CliState,
) -> None:
self._mapping = mapping
self._state = cli_state
self._sample_message_printed = False
self._config: t.Optional[CLiConfig] = None
self.index_store = CliIndexStore(cli_state.index_dir)
def get_persistent_config(self) -> CLiConfig:
if self._config is None:
self._config = self._state.get_persistent_config()
return self._config
def set_persistent_config(self, config: CLiConfig) -> None:
self._state.update_persistent_config(config)
self._config = config
def get_all_content_types(self) -> t.List[t.Type[content_base.ContentType]]:
return list(self._mapping.signal_and_content.content_by_name.values())
def get_content_type(self, name: str) -> t.Type[content_base.ContentType]:
return self._mapping.signal_and_content.content_by_name[name]
def get_all_signal_types(self) -> t.List[t.Type[signal_base.SignalType]]:
return list(self._mapping.signal_and_content.signal_type_by_name.values())
def get_signal_type(self, name: str) -> t.Type[signal_base.SignalType]:
return self._mapping.signal_and_content.signal_type_by_name[name]
def get_signal_types_for_content(
self, content_type: t.Type[content_base.ContentType]
) -> t.List[t.Type[signal_base.SignalType]]:
return self._mapping.signal_and_content.signal_type_by_content[content_type]
def get_fetchers(self):
return [fs for fs in self._mapping.fetcher.fetchers_by_name.values()]
def get_api_for_collab(
self, collab: collab_config.CollaborationConfigBase
) -> SignalExchangeAPI:
return self._mapping.fetcher.fetchers_by_name[collab.api]
def get_fetch_store_for_fetcher(
self, fetcher: t.Type[SignalExchangeAPI]
) -> FetchedStateStoreBase:
return CliSimpleState(fetcher, self._state.dir_for_fetched_state(fetcher))
def get_fetch_store_for_collab(
self, collab: collab_config.CollaborationConfigBase
) -> FetchedStateStoreBase:
return self.get_fetch_store_for_fetcher(
self._mapping.fetcher.fetchers_by_name[collab.api].__class__
)
@property
def in_demo_mode(self) -> bool:
"""Has no live collabs"""
return not self._state.get_all_collabs()
def get_all_collabs(
self, *, default_to_sample: bool = False
) -> t.List[collab_config.CollaborationConfigBase]:
if self.in_demo_mode and default_to_sample:
return [self._get_sample_collab()]
# Should this check whether the APIs are all valid?
return self._state.get_all_collabs()
def get_collab(
self,
name: str,
) -> t.Optional[collab_config.CollaborationConfigBase]:
return self._state.get_collab(name)
def _get_sample_collab(self) -> collab_config.CollaborationConfigBase:
if not self._sample_message_printed:
print(
(
"Looks like you haven't set up a collaboration config, "
"so using the sample one against sample data"
),
file=sys.stderr,
)
self._sample_message_printed = True
return collab_config.CollaborationConfigBase(
"Sample Signals",
StaticSampleSignalExchangeAPI.get_name(),
enabled=True,
only_signal_types={s.get_name() for s in self.get_all_signal_types()},
not_signal_types=set(),
only_owners=set(),
not_owners=set(),
only_tags=set(),
not_tags=set(),
)
def get_collabs_for_fetcher(
self, fetcher: SignalExchangeAPI
) -> t.List[collab_config.CollaborationConfigBase]:
api_name = fetcher.get_name()
return [c for c in self.get_all_collabs() if c.api == api_name]