python-threatexchange/threatexchange/cli/match_cmd.py (171 lines of code) (raw):

#!/usr/bin/env python # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved """ Match command for parsing simple data sources against the dataset. """ import argparse import logging import pathlib import sys import typing as t from threatexchange import common from threatexchange.cli.fetch_cmd import FetchCommand from threatexchange.fetcher.fetch_state import FetchedSignalMetadata from threatexchange.signal_type.index import IndexMatch from threatexchange.cli.exceptions import CommandError from threatexchange.signal_type.signal_base import SignalType from threatexchange.cli.cli_config import CLISettings from threatexchange.content_type.content_base import ContentType from threatexchange.signal_type.signal_base import MatchesStr, TextHasher, FileHasher from threatexchange.cli import command_base class MatchCommand(command_base.Command): """ Match content to items in ThreatExchange. Using the dataset from the fetch command, try to match content. Not all content and hashing types are implemented, so it's possible that you can download signals, but not match them via this command. In some cases the implementation in this package is sub-optimal, either in completeness (i.e. only matching exact when near-matching is supported), or in runtime (i.e. using a linear implementation when a sublinear implementation exists) The output of this command is in the following format: <matched descriptor id> <signal type> <label1> <label2...> If tying this into your own integrity systems, if the result of this match is human review, you'll want to store the matched descriptor id and make a call to all_in_one label descriptor <matched descriptor id> with the results of that review. """ USE_STDIN = "-" @classmethod def init_argparse(cls, settings: CLISettings, ap) -> None: ap.add_argument( "content_type", type=common.argparse_choices_pre_type( [c.get_name() for c in settings.get_all_content_types()], settings.get_content_type, ), help="what kind of content to match", ) ap.add_argument( "--only-signal", "-S", type=common.argparse_choices_pre_type( [s.get_name() for s in settings.get_all_signal_types()], settings.get_signal_type, ), help="limit to this signal type", ) ap.add_argument( "--hashes", "-H", action="store_true", help=( "force input to be interpreted " "as signals for the given signal type" ), ) ap.add_argument( "--inline", "-I", action="store_true", help=("force input to be intepreted inline instead of as files"), ) ap.add_argument( "content", nargs="+", help=( "what to scan for matches. By default assumes filenames. " "Use '-' to read newline-separated stdin" ), ) ap.add_argument( "--show-false-positives", action="store_true", help="show matches even if you've marked them false_positive", ) ap.add_argument( "--hide-disputed", action="store_true", help="hide matches if someone has disputed them", ) def __init__( self, content_type: t.Type[ContentType], only_signal: t.Optional[t.Type[SignalType]], hashes: bool, inline: bool, content: t.List[str], show_false_positives: bool, hide_disputed: bool, ) -> None: self.content_type = content_type self.only_signal = only_signal self.input_generator = self.parse_input(content, hashes, inline) self.inline = inline self.as_hashes = hashes self.show_false_positives = show_false_positives self.hide_disputed = hide_disputed if only_signal and content_type not in only_signal.get_content_types(): raise CommandError( f"{only_signal.get_name()} does not " f"apply to {content_type.get_name()}", 2, ) def parse_input( self, input_: t.Iterable[str], input_is_hashes: bool, inline: bool, no_stderr=False, ) -> t.Generator[t.Union[str, pathlib.Path], None, None]: def interpret_token(tok: str) -> t.Union[str, pathlib.Path]: if inline: return tok path = pathlib.Path(token) if not path.is_file(): raise CommandError(f"No such file {path}", 2) return path for token in input_: token = token.rstrip() if not no_stderr and token == self.USE_STDIN: yield from self.parse_input( sys.stdin, input_is_hashes, inline, no_stderr=True ) continue parsed = interpret_token(token) if input_is_hashes and isinstance(parsed, pathlib.Path): yield from self.parse_input( parsed.open("r"), input_is_hashes=True, inline=True, no_stderr=True, ) else: yield parsed def execute(self, settings: CLISettings) -> None: if not settings.index_store.get_available(): if not settings.in_demo_mode: raise CommandError("No indices available. Do you need to fetch?") self.stderr("You haven't built any indices, so we'll call `fetch` for you!") FetchCommand().execute(settings) signal_types = settings.get_signal_types_for_content(self.content_type) if self.only_signal: signal_types = [self.only_signal] if self.inline: signal_types = [ s for s in signal_types if issubclass(s, (TextHasher, MatchesStr)) ] else: signal_types = [s for s in signal_types if issubclass(s, FileHasher)] logging.info( "Signal types that apply: %s", ", ".join(s.get_name() for s in signal_types) or "None!", ) matchers = [] for s_type in signal_types: index = settings.index_store.load_index(s_type) if index is None: logging.info("No index for %s, skipping", s_type.get_name()) continue query = None if self.inline: if issubclass(s_type, TextHasher): query = lambda t: index.query(s_type.hash_from_str(t)) # type: ignore elif issubclass(s_type, MatchesStr): query = lambda t: index.query(t) # type: ignore else: query = lambda f: index.query(s_type.hash_from_file(f)) # type: ignore if query: matchers.append((s_type, query)) if not matchers: self.stderr("No data to match against") return for inp in self.input_generator: seen = set() for s_type, matcher in matchers: results: t.List[IndexMatch] = matcher(inp) for r in results: metadatas: t.List[t.Tuple[str, FetchedSignalMetadata]] = r.metadata for collab, fetched_data in metadatas: if collab in seen: continue seen.add(collab) print(s_type.get_name(), f"- ({collab})", fetched_data)