hasher-matcher-actioner/hmalib/scripts/populate_config_db.py (175 lines of code) (raw):

#! /usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved # type: ignore """ Write simple configs to the HMA config database To run with defualt configs: $ python3 -m hmalib.scripts.populate_config_db load_example_configs """ import argparse from dataclasses import fields import getpass import json import tempfile import subprocess import typing as t import re from botocore.exceptions import ClientError from hmalib.common.configs.fetcher import ThreatExchangeConfig from hmalib.common.configs.evaluator import ActionRule from hmalib.common.classification_models import ( BankedContentIDClassificationLabel, BankIDClassificationLabel, ClassificationLabel, ActionLabel, ) from hmalib.common import config as hmaconfig from hmalib.common.configs.actioner import WebhookPostActionPerformer SUPPORTED_CONFIGS = [ ThreatExchangeConfig, ] def better_bool_type(s: str): s = s.strip().lower() if s in ("true", "1"): return True if s in ("false", "0"): return False raise argparse.ArgumentTypeError("for bools use 'true' or 'false'") def get_configs(args): config_cls = {config_cls.__name__: config_cls for config_cls in SUPPORTED_CONFIGS}[ args.config_type ] if args.name: print(config_cls.getx(args.name)) return for config in config_cls.get_all(): print(config) def edit_config(args): """Update a config of the chosen type""" kwargs = {} for field in fields(args.config_cls): kwargs[field.name] = getattr(args, field.name) config = args.config_cls(**kwargs) hmaconfig.update_config(config) print(config) def load_defaults(_args): """ Load a hardcoded set of defaults which are useful in testing """ # Could also put the default on the class, but seems too fancy configs = [ ThreatExchangeConfig( name="303636684709969", fetcher_active=True, privacy_group_name="Test Config 1", write_back=True, in_use=True, description="test description", matcher_active=True, ), ThreatExchangeConfig( name="258601789084078", fetcher_active=True, privacy_group_name="Test Config 2", write_back=True, in_use=True, description="test description", matcher_active=True, ), WebhookPostActionPerformer( name="EnqueueForReview", url="https://webhook.site/ff7ebc37-514a-439e-9a03-46f86989e195", headers='{"Connection":"keep-alive"}', # monitoring page: # https://webhook.site/#!/ff7ebc37-514a-439e-9a03-46f86989e195 ), WebhookPostActionPerformer( name="EnqueueMiniCastleForReview", url="https://webhook.site/01cef721-bdcc-4681-8430-679c75659867", headers='{"Connection":"keep-alive"}', # monitoring page: # https://webhook.site/#!/01cef721-bdcc-4681-8430-679c75659867 ), WebhookPostActionPerformer( name="EnqueueSailboatForReview", url="https://webhook.site/fa5c5ad5-f5cc-4692-bf03-a03a4ae3f714", headers='{"Connection":"keep-alive"}', # monitoring page: # https://webhook.site/#!/fa5c5ad5-f5cc-4692-bf03-a03a4ae3f714 ), ActionRule( name="Enqueue Mini-Castle for Review", action_label=ActionLabel("EnqueueMiniCastleForReview"), must_have_labels=set( [ BankIDClassificationLabel("303636684709969"), ClassificationLabel("true_positive"), ] ), must_not_have_labels=set( [BankedContentIDClassificationLabel("3364504410306721")] ), ), ActionRule( name="Enqueue Sailboat for Review", action_label=ActionLabel("EnqueueSailboatForReview"), must_have_labels=set( [ BankIDClassificationLabel("303636684709969"), ClassificationLabel("true_positive"), BankedContentIDClassificationLabel("3364504410306721"), ] ), must_not_have_labels=set(), ), ] for config in configs: # Someday maybe can do filtering or something, I dunno # Add try catch block to avoid test failure try: hmaconfig.create_config(config) except ClientError as e: if e.response["Error"]["Code"] == "ConditionalCheckFailedException": print( "Can't insert duplicated config, " + e.response["Error"]["Message"], ) else: raise print(config) if __name__ == "__main__": parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--table", default=f"{getpass.getuser()}-HMAConfig", help="The name of the config dynamodb table", ) subparsers = parser.add_subparsers() # Update update = subparsers.add_parser("update", help="Add or edit a config") update_subparsers = update.add_subparsers() for config_cls in SUPPORTED_CONFIGS: sub = update_subparsers.add_parser( config_cls.__name__, help="Update a config of this type" ) sub.set_defaults(config_cls=config_cls, fn=edit_config) for field in fields(config_cls): origin = t.get_origin(field.type) parse_type = str addtl_kwargs = {} if field.type in (str, float, int): parse_type = field.type elif field.type is bool: parse_type = better_bool_type elif field.type in (t.Set[str], t.List[str]): def parse_type(x): return set(x.split(",")) else: raise Exception(f"Unsupported type in config: {field.type}") arg = field.name if arg != "name": arg = f"--{arg}" addtl_kwargs["required"] = True sub.add_argument( arg, type=parse_type, metavar=re.sub( r"^typing.", "", re.sub(r"<class '([^']+)'>", r"\1", str(field.type)), ), help=f"{field.name}", **addtl_kwargs, ) # load_examples ex_subparser = subparsers.add_parser( "load_example_configs", help="Populate db with a default set of configs" ) ex_subparser.set_defaults(fn=load_defaults) # get get_config_parser = subparsers.add_parser("get", help="get configs") get_config_parser.add_argument( "config_type", choices=[config_cls.__name__ for config_cls in SUPPORTED_CONFIGS], help="which config type to get", ) get_config_parser.add_argument("--name", help="the name of the config") get_config_parser.set_defaults(fn=get_configs) args = parser.parse_args() hmaconfig.HMAConfig.initialize(args.table) args.fn(args)