hasher-matcher-actioner/hmalib/scripts/cli/soak.py (304 lines of code) (raw):

#! /usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved """ Soak/endurance test a deployed instance of HMA Besides deploying you will need a place to set off the test (ec2 instance + long running tmux session) Structure of the soak test Initial set up (no point in a long running test if this doesn't work) - Access to API via refresh token (+ client_id) - Image used has a hash that will match the systems index - PR Configs (and associated signals) exist so match records will be created - Action Rules (and associated actions) exist so action will send post request Run - Submit Content Test (done every interval=seconds) - Sleep - Repeat """ import argparse import sys import typing as t import cmd import os import argparse import time import threading import uuid import datetime import numpy as np import pandas as pd import typing as t import hmalib.scripts.cli.command_base as base import hmalib.scripts.common.utils as utils from hmalib.scripts.common.client_lib import DeployedInstanceClient from hmalib.scripts.common.listener import Listener from hmalib.scripts.common.submitter import Submitter class SoakCommand(base.Command, base.NeedsAPIAccess): """ Start a soak test on a deployed HMA instance and submit until cancelled """ @classmethod def init_argparse(cls, ap: argparse.ArgumentParser) -> None: ap.add_argument( "--hostname", help="Hostname used to listen for the actions webhooks.", ) ap.add_argument( "--port", help="Port used to listen for the actioner.", default=8080, ) ap.add_argument( "--batch_size", help="Number of images to submit in each batch.", default=5, ) ap.add_argument( "--seconds_between_batches", help="Number of seconds between completed submission batches.", default=5, ) ap.add_argument( "--auto_start", action="store_true", help="Start submitting right away.", ) ap.add_argument( "--skip_listener", action="store_true", help="Do not use a listener at all.", ) ap.add_argument( "--filepaths", action="extend", nargs="*", type=str, help="List of filepaths for submit use (will start each batch at the start of the list).", ) @classmethod def get_name(cls) -> str: """The display name of the command""" return "soak" @classmethod def get_help(cls) -> str: """The short help of the command""" return "run a soak test" def __init__( self, hostname: str, port: int, batch_size: int, seconds_between_batches: int, auto_start: bool = False, skip_listener: bool = False, filepaths: t.List[str] = [], ) -> None: self.hostname = hostname self.port = int(port) self.batch_size = int(batch_size) self.seconds_between_batches = int(seconds_between_batches) self.auto_start = auto_start self.skip_listener = skip_listener self.filepaths = filepaths def execute(self, api: utils.HasherMatcherActionerAPI) -> None: helper = DeployedInstanceClient(api=api) if self.skip_listener: helper.set_up_test("http://httpstat.us/404") else: helper.set_up_test(f"http://{self.hostname}:{self.port}") submitter = Submitter( helper, self.batch_size, self.seconds_between_batches, self.filepaths ) if self.skip_listener: listener = None else: listener = Listener(self.hostname, self.port) listener.start_listening() if self.auto_start: time.sleep(3) submitter.start() SoakShell(submitter, listener).cmdloop() if submitter.is_alive(): submitter.stop() total_submit = submitter.get_total_submit_count() if listener: total_received = listener.get_post_request_count() listener.stop_listening() print("Submitter and listener stopped.") print(f"FINAL TOTAL SUBMITTED: {total_submit}") print(f"FINAL TOTAL POST requests received: {total_received}") difference = total_submit - total_received if difference: print(f"Difference of {difference} found") if difference > 0: print( "If you exited before waiting on the listener, this is expect. (Warning the actioner may keep trying for a bit)" ) if difference < 0: print( f"Negative difference means more action request than submissions were received. (likely bug or multiply actions configured)" ) else: print(f"FINAL TOTAL SUBMITTED: {total_submit}") if listener: if data := listener.get_submission_latencies(): _generate_latency_stats(data) helper.clean_up_test() print(f"Test Run Complete. Thanks!") class SoakShell(cmd.Cmd): intro = "Welcome! enter 'start' to begin submitting and 'info' to see current counts. Type help or ? to list commands.\n" prompt = "> " def __init__(self, submitter=None, listener=None): super(SoakShell, self).__init__() self.submitter = submitter self.listener = listener self.submitter_paused = False # Cache submitter setting so the lock can be used to pause if submitter: self._refresh_cached_submitter_settings() def _refresh_cached_submitter_settings(self): ( self.batch_size_cache, self.sec_btw_batch_cache, self.total_submitted_cache, ) = self.submitter.get_current_values() def do_info(self, arg): "Get status of the test: info" if self.submitter_paused: print("Submitter is paused.") else: self._refresh_cached_submitter_settings() print( f"Submitter Settings: {self.batch_size_cache} items every {self.sec_btw_batch_cache} seconds." ) print(f"TOTAL SUBMITTED: {self.total_submitted_cache}") if self.listener: print( f"TOTAL POST requests received: {self.listener.get_post_request_count()}" ) def do_latency(self, arg): "Get the latency of submissions: latency" if self.listener: if data := self.listener.get_submission_latencies(): _, _, latencies = list(zip(*data)) latencies = np.array(latencies[-10:]) if latencies.size: print( "Rough delay between submit to action request received (10 most recent)" ) print(f"avg: {latencies.mean()} seconds") return print("No requests received yet.") return print("No listener found.") def do_start(self, arg): "Start submitting thread: start" try: if self.submitter_paused: print("Submitter is paused. Use 'resume' instead of 'start'") return self._refresh_cached_submitter_settings() self.submitter.start() print("Started Submitter") except RuntimeError: if self.submitter.is_alive(): print("Submitter has already started.") else: print( "Submitter cannot be (re)started. Exit and run the script again to restart submitting." ) def do_pause(self, arg): "Pause submitting thread: PAUSE" if not self.submitter.is_alive(): print("Submitter is not running.") return if self.submitter_paused: print("Submitter is already paused.") return self._refresh_cached_submitter_settings() self.submitter._lock.acquire() print("Submitter Paused") print(f"TOTAL SUBMITTED: {self.total_submitted_cache}") self.submitter_paused = True def do_resume(self, arg): "Resume submitting thread: resume" if not self.submitter.is_alive(): print("Submitter is not running.") return if not self.submitter_paused: print("Submitter is not paused.") return self.submitter._lock.release() print("Resuming submissions") self.submitter_paused = False def do_stop(self, arg): "Stop submitting thread: stop" if not self.submitter.is_alive(): print("Submitter is not running.") return self.submitter.stop() if self.submitter_paused: self.submitter._lock.release() self.submitter_paused = False self._refresh_cached_submitter_settings() print("Stopped Submitter") print(f"TOTAL SUBMITTED: {self.total_submitted_cache}") def _valid_update(self, arg): if self.submitter_paused: print("Updates currently not supported while paused (keeps locking simple)") return False try: value = int(arg) except: print("value must be an int") return False if value <= 0: print("value must be positive") return False return True def do_update_batch_size(self, arg): "Update batch size: update_batch_size 5" if self._valid_update(arg): self.submitter.set_batch_size(int(arg)) self.batch_size_cache = int(arg) print(f"Updated batch_size to {self.batch_size_cache}") def do_update_sec_btw_batch(self, arg): "Update seconds between batch submissions: update_sec_btw_batch 5" if self._valid_update(arg): self.submitter.set_seconds_between_batches(int(arg)) self.sec_btw_batch_cache = int(arg) print(f"Updated seconds_between_batches to {self.sec_btw_batch_cache}") def _provide_wait_for_listener_option(self): submitted = self.submitter.get_total_submit_count() received = self.listener.get_post_request_count() if submitted - received > 0: cmd = input("Wait for listener to catch up before exiting? (y/N): ") if cmd == "y": submitted = self.submitter.get_total_submit_count() print(f"TOTAL SUBMITTED: {submitted}") received = self.listener.get_post_request_count() prev = -1 try: while submitted - received > 0: if received > prev: print( f"\tWaiting on {submitted-received} more requests", end="\r", ) prev = received received = self.listener.get_post_request_count() time.sleep(3) except KeyboardInterrupt: print("KeyboardInterrupt: Skipping wait\n") else: print("Not waiting for listener") def do_exit(self, arg): "Stop and exit: EXIT" if self.submitter.is_alive(): self.submitter.stop() if self.submitter_paused: self.submitter._lock.release() self.submitter_paused = False print("Stopped Submitter") if self.listener: self._provide_wait_for_listener_option() print("\nClosing Shell...\n") return True def _generate_latency_stats( data: t.List[t.Tuple[datetime.datetime, datetime.datetime, float]] ): timestamps, _, delays = list(zip(*data)) times = pd.to_datetime(np.array(timestamps, dtype="datetime64[ns]")) df = pd.DataFrame({"times": times, "delays": delays}) def func(x): a = x["delays"].mean() b = (x["delays"]).quantile(0.5) c = (x["delays"]).quantile(0.9) return pd.Series([a, b, c], index=["avg", "p50", "p90"]) df.index = df.times df = df.groupby(pd.Grouper(freq="1min")).apply(func) print("Breaking down completed action's latency by time received in 1 min buckets") print(df) filename = "soak_test_timestamps.txt" print(f"Writing times to {filename}") print(f"Format: time_submitted, time_action_received, delta_in_seconds") with open(filename, "a") as f: # append mode for record in data: f.write(f"{record[1].isoformat()}, {record[0].isoformat()}, {record[2]}\n")