python-threatexchange/threatexchange/cli/main.py (172 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
A wrapper around multi-stage ThreatExchange operations.
Includes simple matching and writing back. Useful for quickly validating new
sources of ThreatExchange data. A possible template for a native
implementation in your own architecture.
This helper heavily relies on a config file to provide consistent behavior
between stages, and a state file to store hashes.
"""
import argparse
from dataclasses import dataclass
from distutils import extension
import logging
import inspect
import os
import sys
import typing as t
import pathlib
from threatexchange import meta
from threatexchange.content_type.content_base import ContentType
from threatexchange.extensions.manifest import ThreatExchangeExtensionManifest
from threatexchange.fb_threatexchange import api as tx_api
from threatexchange.fetcher.apis.file_api import LocalFileSignalExchangeAPI
from threatexchange.fetcher.apis.static_sample import StaticSampleSignalExchangeAPI
from threatexchange.fetcher.apis.fb_threatexchange_api import (
FBThreatExchangeSignalExchangeAPI,
)
from threatexchange.fetcher.apis.stop_ncii_api import StopNCIIAPI
from threatexchange.content_type import photo, video, text, url
from threatexchange.fetcher.fetch_api import SignalExchangeAPI
from threatexchange.signal_type import (
pdq,
md5,
raw_text,
url as url_signal,
url_md5,
trend_query,
)
from threatexchange.cli.cli_config import CLiConfig, CliState
from threatexchange.cli.cli_config import CLISettings
from threatexchange.cli import (
command_base as base,
fetch_cmd,
label_cmd,
dataset_cmd,
hash_cmd,
match_cmd,
config_cmd,
)
from threatexchange.signal_type.signal_base import SignalType
def get_subcommands() -> t.List[t.Type[base.Command]]:
return [
config_cmd.ConfigCommand,
fetch_cmd.FetchCommand,
match_cmd.MatchCommand,
label_cmd.LabelCommand,
dataset_cmd.DatasetCommand,
hash_cmd.HashCommand,
]
def get_argparse(settings: CLISettings) -> argparse.ArgumentParser:
ap = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
ap.add_argument(
"--app-token",
"-a",
metavar="TOKEN",
help="the App token for ThreatExchange",
)
subparsers = ap.add_subparsers(title="verbs", help="which action to do")
for command in get_subcommands():
command.add_command_to_subparser(settings, subparsers)
return ap
def execute_command(settings: CLISettings, namespace) -> None:
assert hasattr(namespace, "command_cls")
command_cls = namespace.command_cls
logging.debug("Setup complete, handing off to %s", command_cls.__name__)
# Init everything
command_argspec = inspect.getfullargspec(command_cls.__init__)
arg_names = set(command_argspec[0])
# Since we didn't import click, use hard-to-debug magic to init the command
command_args = {k: v for k, v in namespace.__dict__.items() if k in arg_names}
if "full_argparse_namespace" in arg_names:
command_args["full_argparse_namespace"] = namespace
command = command_cls(**command_args)
command.execute(settings)
def _get_fb_tx_app_token(config: CLiConfig) -> t.Optional[str]:
"""
Get the API key from a variety of fallback sources
Examples might be environment, files, etc
"""
file_loc = pathlib.Path("~/.txtoken").expanduser()
environment_var = "TX_ACCESS_TOKEN"
potential_sources = (
(os.environ.get(environment_var), f"{environment_var} environment variable"),
(
config.fb_threatexchange_api_token,
"`config api fb_threat_exchange --api-token` command",
),
(file_loc.exists() and file_loc.read_text(), f"{file_loc} file"),
)
for val, source in potential_sources:
if not val:
continue
val = val.strip()
if tx_api.is_valid_app_token(val):
return val
print(
(
f"Warning! Your current app token {val!r} (from {source}) is invalid.\n"
"Double check that it's an 'App Token' from "
"https://developers.facebook.com/tools/accesstoken/",
),
file=sys.stderr,
)
# Don't throw because we don't want to block commands that fix this
return None # We probably don't expect to fall back here
return None
class _ExtendedTypes(t.NamedTuple):
content_types: t.List[t.Type[ContentType]]
signal_types: t.List[t.Type[SignalType]]
api_instances: t.List[SignalExchangeAPI]
def _get_extended_functionality(config: CLiConfig) -> _ExtendedTypes:
ret = _ExtendedTypes([], [], [])
for extension in config.extensions:
logging.debug("Loading extension %s", extension)
manifest = ThreatExchangeExtensionManifest.load_from_module_name(extension)
ret.signal_types.extend(manifest.signal_types)
ret.content_types.extend(manifest.content_types)
ret.api_instances.extend(api() for api in manifest.apis)
return ret
def _get_settings(config: CLiConfig, dir: pathlib.Path) -> CLISettings:
"""
Configure the behavior and functionality.
"""
extensions = _get_extended_functionality(config)
signals = meta.SignalTypeMapping(
[photo.PhotoContent, video.VideoContent, url.URLContent, text.TextContent]
+ extensions.content_types,
[
pdq.PdqSignal,
md5.VideoMD5Signal,
raw_text.RawTextSignal,
url_signal.URLSignal,
url_md5.UrlMD5Signal,
trend_query.TrendQuerySignal,
]
+ extensions.signal_types,
)
fetchers = meta.FetcherMapping(
[
StaticSampleSignalExchangeAPI(),
LocalFileSignalExchangeAPI(),
StopNCIIAPI(),
FBThreatExchangeSignalExchangeAPI(_get_fb_tx_app_token(config)),
]
+ extensions.api_instances
)
state = CliState(list(fetchers.fetchers_by_name.values()), dir=dir)
return CLISettings(meta.FunctionalityMapping(signals, fetchers, state), state)
def _setup_logging():
level = logging.DEBUG
verbose = os.getenv("TX_VERBOSE", "0")
if verbose == "0":
level = logging.CRITICAL
if verbose == "1":
level = logging.INFO
logging.basicConfig(
format="%(asctime)s %(levelname).1s] %(message)s", level=level, force=True
)
def main(
args: t.Optional[t.Sequence[t.Text]] = None,
state_dir: pathlib.Path = pathlib.Path("~/.threatexchange"),
) -> None:
_setup_logging()
config = CliState(
[], state_dir
).get_persistent_config() # TODO fix the circular dependency
settings = _get_settings(config, state_dir)
ap = get_argparse(settings)
namespace = ap.parse_args(args)
execute_command(settings, namespace)
if __name__ == "__main__":
try:
_setup_logging()
main()
except base.CommandError as ce:
print(ce, file=sys.stderr)
sys.exit(ce.returncode)
except KeyboardInterrupt:
# No stack for CTRL+C
sys.exit(130)