python-threatexchange/threatexchange/cli/label_cmd.py (131 lines of code) (raw):

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved from argparse import ArgumentParser, ArgumentTypeError import logging import typing as t from threatexchange.fetcher.fetch_state import SignalOpinion, SignalOpinionCategory from threatexchange.signal_type.signal_base import MatchesStr, SignalType, TextHasher from threatexchange import common from threatexchange.cli.cli_config import CLISettings from threatexchange.content_type.content_base import ContentType from threatexchange.fetcher.collab_config import CollaborationConfigBase from threatexchange.cli import command_base class LabelCommand(command_base.Command): """ Label signals and content for sharing. There are three main types of labeling: 1. Seen Marking that you've observed a match, which can help others prioritize review, or track cross-platform spread. 2. True Positive / False Positive After you've confirmed the results of a match, contributing what that result is can help others priotize signals with more precision. 3. Upload If you have your own curated signals, sharing them with others can help them find matches, and give them opportunities to label your signals. Examples: $ threatexchange label "Sample Collab" text """ @classmethod def init_argparse(cls, settings: CLISettings, ap: ArgumentParser) -> None: label_with = ap.add_mutually_exclusive_group() label_with.add_argument( "--tags", type=lambda s: set(s.strip().split(",")), metavar="CSV", default=set(), help="tags to apply to item", ) label_with.add_argument( "--seen", action="store_true", help="tags to apply to item", ) label_with.add_argument( "--false-positive", action="store_true", help="tags to apply to item", ) label_with.add_argument( "--true-positive", action="store_true", help="tags to apply to item", ) ap.add_argument( "--only-signals", "-S", nargs="+", type=common.argparse_choices_pre_type( [s.get_name() for s in settings.get_all_signal_types()], settings.get_signal_type, ), default=[], help="limit to this signal type", ) ap.add_argument( "--is-hash", "-H", action="store_true", help="interpret content as a hash (requires a single -S)", ) ap.add_argument( "collab", type=lambda n: _collab_type(n, settings), help="The name of the collaboration", ) ap.add_argument( "content_type", type=common.argparse_choices_pre_type( [c.get_name() for c in settings.get_all_content_types()], settings.get_content_type, ), help="the type of what you are labeling", ) ap.add_argument( "content", help="the content you are labeling", ) def __init__( self, content_type: t.Type[ContentType], content: str, is_hash: bool, collab: CollaborationConfigBase, only_signals: t.List[t.Type[SignalType]], tags: t.Set[str], true_positive: bool, false_positive: bool, seen: bool, ) -> None: self.collab = collab self.content_type = content_type self.content = content self.tags = tags self.only_signals = only_signals self.is_hash = is_hash if is_hash: if len(self.only_signals) != 1: raise ArgumentTypeError("[-H] use only one argument for -S") if self.collab is None: raise ArgumentTypeError("No such collaboration!") self.action = self.execute_upload if true_positive: self.action = self.execute_true_positive elif false_positive: self.action = self.execute_false_positive elif seen: self.action = self.execute_seen def execute(self, settings: CLISettings) -> None: self.action(settings) def execute_upload(self, settings: CLISettings) -> None: api = settings.get_api_for_collab(self.collab) signal_types = self.only_signals or settings.get_signal_types_for_content( self.content_type ) if self.is_hash: hash_val = signal_types[0].validate_signal_str(self.content) api.report_opinion( self.collab, signal_types[0], hash_val, SignalOpinion( api.get_own_owner_id(self.collab), SignalOpinionCategory.TRUE_POSITIVE, self.tags, ), ) raise NotImplementedError def execute_seen(self, settings: CLISettings) -> None: raise NotImplementedError def execute_true_positive(self, settings: CLISettings) -> None: raise NotImplementedError def execute_false_positive(self, settings: CLISettings) -> None: raise NotImplementedError def _collab_type(name: str, settings: CLISettings) -> CollaborationConfigBase: ret = settings.get_collab(name) if ret is None: raise ArgumentTypeError(f"No such collab '{name}'!") return ret