elkserver/docker/redelk-base/redelkinstalldata/scripts/daemon.py (175 lines of code) (raw):

#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Part of RedELK Script to check if there are alarms to be sent Authors: - Outflank B.V. / Mark Bergman (@xychix) - Lorenzo Bernardi (@fastlorenzo) """ import copy import importlib import logging import os import traceback from config import alarms, LOGLEVEL, notifications from modules.helpers import ( add_alarm_data, group_hits, module_did_run, set_tags, module_should_run, ) MODULES_PATH = "./modules/" def load_modules(): """Attempt to load the different modules in their respective dictionaries, and return them""" alarm_dict = {} # aD alarm Dict connector_dict = {} # cD connector Dict enrich_dict = {} # eD enrich Dict module_folders = os.listdir(MODULES_PATH) for module_name in module_folders: # only take folders and not '__pycache__' if ( os.path.isdir(os.path.join(MODULES_PATH, module_name)) and module_name != "__pycache__" ): try: module = importlib.import_module(f"modules.{module_name}.module") if hasattr(module, "info") and hasattr(module, "Module"): module_type = module.info.get("type", None) if module_type == "redelk_alarm": alarm_dict[module_name] = {} alarm_dict[module_name]["info"] = module.info alarm_dict[module_name]["m"] = module alarm_dict[module_name]["status"] = "pending" elif module_type == "redelk_connector": connector_dict[module_name] = {} connector_dict[module_name]["info"] = module.info connector_dict[module_name]["m"] = module connector_dict[module_name]["status"] = "pending" elif module_type == "redelk_enrich": enrich_dict[module_name] = {} enrich_dict[module_name]["info"] = module.info enrich_dict[module_name]["m"] = module enrich_dict[module_name]["status"] = "pending" # pylint: disable=broad-except except Exception as error: logger.error("Error in module %s: %s", module_name, error) logger.exception(error) return (alarm_dict, connector_dict, enrich_dict) def run_enrichments(enrich_dict): """Run the different enrichment scripts that are enabled""" logger.info("Running enrichment modules") # First loop through the enrichment modules for enrich_module in enrich_dict: if module_should_run(enrich_module, "redelk_enrich"): try: logger.debug("[e] initiating class Module() in %s", enrich_module) module_class = enrich_dict[enrich_module]["m"].Module() logger.debug( "[e] Running Run() from the Module class in %s", enrich_module ) enrich_dict[enrich_module]["result"] = copy.deepcopy(module_class.run()) # Now loop through the hits and tag them for hit in enrich_dict[enrich_module]["result"]["hits"]["hits"]: set_tags(enrich_dict[enrich_module]["info"]["submodule"], [hit]) hits = len(enrich_dict[enrich_module]["result"]["hits"]["hits"]) module_did_run( enrich_module, "enrich", "success", f"Enriched {hits} documents", hits, ) enrich_dict[enrich_module]["status"] = "success" # pylint: disable=broad-except except Exception as error: stack_trace = traceback.format_exc() msg = f"Error running enrichment {enrich_module}: {error} | StackTrace: {stack_trace}" logger.error(msg) logger.exception(error) module_did_run(enrich_module, "enrich", "error", msg) enrich_dict[enrich_module]["status"] = "error" else: enrich_dict[enrich_module]["status"] = "did_not_run" return enrich_dict def run_alarms(alarm_dict): """Run the different alarm scripts that are enabled and return the results""" logger.info("Running alarm modules") # this means we've loaded the modules and will now loop over those one by one for alarm_module in alarm_dict: if module_should_run(alarm_module, "redelk_alarm"): try: logger.debug("[a] initiating class Module() in %s", alarm_module) module_class = alarm_dict[alarm_module]["m"].Module() logger.debug( "[a] Running Run() from the Module class in %s", alarm_module ) alarm_dict[alarm_module]["result"] = copy.deepcopy(module_class.run()) hits = len(alarm_dict[alarm_module]["result"]["hits"]["hits"]) module_did_run( alarm_module, "alarm", "success", f"Found {hits} documents to alarm", hits, ) alarm_dict[alarm_module]["status"] = "success" # pylint: disable=broad-except except Exception as error: stack_trace = traceback.format_exc() msg = f"Error running alarm {alarm_module}: {error} | StackTrace: {stack_trace}" logger.error(msg) logger.exception(error) module_did_run(alarm_module, "alarm", "error", msg) alarm_dict[alarm_module]["status"] = "error" else: alarm_dict[alarm_module]["status"] = "did_not_run" return alarm_dict def process_alarms(connector_dict, alarm_dict): """Process the alarm results and send notifications via connector modules""" logger.info("Processing alarms") # now we can loop over the modules once again and log the lines for alarm in alarm_dict: if alarm in alarms and alarms[alarm]["enabled"]: alarm_status = alarm_dict[alarm]["status"] # If the alarm did fail to run, skip processing the notification and tagging as we are not sure of the results if alarm_status == "error": logger.warning( "Alarm %s did not run correctly, skipping processing (status: %s)", alarm, alarm_status, ) continue if alarm_status == "did_not_run": logger.debug( "Alarm %s did not run (this was expected), skipping processing (status: %s)", alarm, alarm_status, ) continue if alarm_status == "unknown": logger.warning( "Alarm %s returned and unknown status (this should never happen), skipping processing (status: %s)", alarm, alarm_status, ) continue logger.debug("Alarm %s enabled, processing hits", alarm) result = alarm_dict[alarm]["result"] alarm_name = alarm_dict[alarm]["info"]["submodule"] # logger.debug('Alarm results: %s' % aD[a]['result']) for result_hits in result["hits"]["hits"]: # First check if there is a mutation data to add logger.debug(result_hits) if result_hits["_id"] in result["mutations"]: mutations = result["mutations"][result_hits["_id"]] else: mutations = {} # And now, let's add mutations data to the doc and update back the hits result_hits = add_alarm_data(result_hits, mutations, alarm_name) # Let's tag the docs with the alarm name set_tags(alarm_name, result["hits"]["hits"]) logger.debug( "calling settags %s (%d hits)", alarm_name, result["hits"]["total"] ) # Needed as groupHits will change r['hits']['hits'] and different alarms might do different grouping result = copy.deepcopy(alarm_dict[alarm]["result"]) if result["hits"]["total"] > 0: # Group the hits before sending it to the alarm, based on the 'groubpby' array returned by the alarm group_by = list(result["groupby"]) result["hits"]["hits"] = group_hits(result["hits"]["hits"], group_by) for connector in connector_dict: # connector will process ['hits']['hits'] which contains a list of 'jsons' looking like an ES line # connector will report the fields in ['hits']['fields'] for each of the lines in the list if ( connector in notifications and notifications[connector]["enabled"] ): connector_mod = connector_dict[connector]["m"].Module() logger.info( "connector %s enabled, sending alarm (%d hits)", connector, result["hits"]["total"], ) connector_mod.send_alarm(result) # Main entry point of the file if __name__ == "__main__": logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(filename)s - %(funcName)s -- %(message)s", level=LOGLEVEL, ) logger = logging.getLogger("alarm") # 1. Load all modules (aD, cD, eD) = load_modules() # 2. Run enrichment modules eD = run_enrichments(eD) # 3. Run alarm modules aD = run_alarms(aD) # 4. Process the alarms generated by alarm modules process_alarms(cD, aD)