python-threatexchange/threatexchange/cli/hash_cmd.py (79 lines of code) (raw):

#!/usr/bin/env python # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved """ Hash command to convert content into signatures. """ import pathlib import sys import typing as t from threatexchange.cli.cli_config import CLISettings from threatexchange.signal_type.signal_base import BytesHasher, FileHasher, TextHasher from threatexchange.cli import command_base # TODO consider refactor to handle overlap with match class HashCommand(command_base.Command): """ Hash content into signatures (aka hashes). Reads inputs as filenames by default, though it will attempt to read inline with --inline. Most useful with with content type `text`. You can also pass in via stdin by using "-" as the content. """ USE_STDIN = "-" @classmethod def init_argparse(cls, settings: CLISettings, ap) -> None: signal_types = [ s for s in settings.get_all_signal_types() if issubclass(s, (TextHasher, BytesHasher)) ] ap.add_argument( "content_type", choices={c.get_name() for s in signal_types for c in s.get_content_types()}, help="what kind of content to hash", ) ap.add_argument( "--signal-type", "-S", choices=[s.get_name() for s in signal_types], help="only generate these signal types", ) ap.add_argument( "--inline", "-I", action="store_true", help="interpret content inline instead of as filenames", ) ap.add_argument( "content", nargs="+", help="list of content or '-' for stdin", ) def __init__( self, content_type: str, signal_type: t.Optional[str], inline: bool, content: t.Union[t.List[str], t.TextIO], ) -> None: self.content_type_str = content_type self.signal_type = signal_type if content == [self.USE_STDIN]: content = sys.stdin self.input_generator = self._parse_input(content, inline) def _parse_input( self, input_: t.Iterable[str], force_input_to_text: bool, ) -> t.Generator[t.Union[str, pathlib.Path], None, None]: for token in input_: token = token.rstrip() if force_input_to_text: yield token else: yield pathlib.Path(token) def execute(self, settings: CLISettings) -> None: content_type = settings.get_content_type(self.content_type_str) all_signal_types = [ s for s in settings.get_signal_types_for_content(content_type) if self.signal_type in (None, s.get_name()) ] file_hashers = [s for s in all_signal_types if issubclass(s, FileHasher)] str_hashers = [s for s in all_signal_types if issubclass(s, TextHasher)] for inp in self.input_generator: hash_fn = lambda s, t: s.hash_from_file(t) signal_types: t.List[t.Any] = file_hashers if isinstance(inp, str): hash_fn = lambda s, t: s.hash_from_str(t) signal_types = str_hashers for signal_type in signal_types: hash_str = hash_fn(signal_type, inp) if hash_str: print(signal_type.get_name(), hash_str)