elkserver/docker/redelk-base/redelkinstalldata/scripts/modules/helpers.py (254 lines of code) (raw):

#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Part of RedELK Authors: - Outflank B.V. / Mark Bergman (@xychix) - Lorenzo Bernardi (@fastlorenzo) """ import copy import datetime import json import logging import os import re from elasticsearch import Elasticsearch import urllib3 import config # Domain name regex pattern domain_pattern = re.compile( r"^((?:[a-zA-Z0-9]" # First character of the domain r"(?:[a-zA-Z0-9-_]{0,61}[A-Za-z0-9])?\.)" # Sub domain + hostname r"+[A-Za-z0-9][A-Za-z0-9-_]{0,61}" # First 61 characters of the gTLD r"[A-Za-z])" # Last character of the gTLD r"(?:\s*#\s*(.*))?$" # Optional comment ) urllib3.disable_warnings() es = Elasticsearch(config.es_connection, verify_certs=False) logger = logging.getLogger("helpers") def pprint(to_print): """Returns a visual representation of an object""" if isinstance(to_print, type(str)): return to_print out_string = json.dumps(to_print, indent=2, sort_keys=True) return out_string def to_unicode(obj, charset="utf-8", errors="strict"): """Converts obj to unicode""" if obj is None: return None if not isinstance(obj, bytes): return str(obj) return obj.decode(charset, errors) def is_json(myjson): """Returns true if the string is a valid json""" try: json.loads(myjson) except ValueError: return False return True def match_domain_name(domain): """Returns the match if the domain is valid""" try: return domain_pattern.match(to_unicode(domain).encode("idna").decode("ascii")) except (UnicodeError, UnicodeEncodeError, AttributeError): return None def get_value(path, source, default_value=None): """Gets the value in source based on the provided path, or 'default_value' if not exists (default: None)""" split_path = path.split(".") if split_path[0] in source: if len(split_path) > 1: return get_value(".".join(split_path[1:]), source[split_path[0]]) if split_path[0] == "ip": if isinstance(source[split_path[0]], type([])): return source[split_path[0]][0] return source[split_path[0]] return default_value def get_query(query, size=5000, index="redirtraffic-*"): """Get results via ES query. Returns [] if nothing found""" es_query = {"query": {"query_string": {"query": query}}} # pylint: disable=unexpected-keyword-arg es_result = es.search(index=index, body=es_query, size=size) if es_result["hits"]["total"]["value"] == 0: return [] return es_result["hits"]["hits"] def get_hits_count(query, index="redirtraffic-*"): """Returns the total number of hits for a given query""" es_query = {"query": {"query_string": {"query": query}}} # pylint: disable=unexpected-keyword-arg es_result = es.search(index=index, body=es_query, size=0) return es_result["hits"]["total"]["value"] def raw_search(query, size=10000, index="redirtraffic-*"): """Execute a raw ES query. Returns the hits or None if no results""" # pylint: disable=unexpected-keyword-arg es_result = es.search(index=index, body=query, size=size) if es_result["hits"]["total"]["value"] == 0: return None return es_result def set_tags(tag, lst): """Sets tag to all objects in lst""" for doc in lst: if "tags" in doc["_source"] and tag not in doc["_source"]["tags"]: doc["_source"]["tags"].append(tag) else: doc["_source"]["tags"] = [tag] es.update(index=doc["_index"], id=doc["_id"], body={"doc": doc["_source"]}) def add_tags_by_query(tags, query, index="redirtraffic-*"): """Add tags by DSL query in batch""" tags_string = ",".join(map(repr, tags)) update_q = { "script": { "source": f"ctx._source.tags.add([{tags_string}])", "lang": "painless", }, "query": query, } return es.update_by_query(index=index, body=update_q) def add_alarm_data(doc, data, alarm_name, alarmed=True): """Adds alarm extra data to the source doc in ES""" now_str = datetime.datetime.utcnow().isoformat() # Create the alarm field if it doesn't exist yet if "alarm" not in doc["_source"]: doc["_source"]["alarm"] = {} # Set the last checked date data["last_checked"] = now_str doc["_source"]["alarm"]["last_checked"] = now_str # set the last alarmed date (if alarmed) if alarmed: doc["_source"]["alarm"]["last_alarmed"] = now_str data["last_alarmed"] = now_str # Add the extra data doc["_source"]["alarm"][alarm_name] = data es.update(index=doc["_index"], id=doc["_id"], body={"doc": doc["_source"]}) return doc def set_checked_date(doc): """Sets the alarm.last_checked date to an ES doc""" if "alarm" in doc["_source"]: doc["_source"]["alarm"]["last_checked"] = datetime.datetime.utcnow().isoformat() else: doc["_source"]["alarm"] = { "last_checked": datetime.datetime.utcnow().isoformat() } es.update(index=doc["_index"], id=doc["_id"], body={"doc": doc["_source"]}) return doc def group_hits(hits, groupby, res=None): """Takes a list of hits and a list of field names (dot notation) and returns a grouped list""" if len(groupby) > 0: hits_list = {} # First time in the loop if res is None: for hit in hits: value = get_value(f"_source.{groupby[0]}", hit) if value in hits_list: hits_list[value].append(hit) else: hits_list[value] = [hit] else: for key, val in res.items(): for hit in val: value = get_value(f"_source.{groupby[0]}", hit) tmp_key = f"{key} / {value}" if tmp_key in hits_list: hits_list[tmp_key].append(hit) else: hits_list[tmp_key] = [hit] groupby.pop(0) return group_hits(hits, groupby, hits_list) if res is None: return hits tmp_hits = [] for key, value in res.items(): tmp_hits.append(value[0]) return tmp_hits def get_last_run(module_name): """Returns the last time the module did run""" try: query = {"query": {"term": {"module.name": module_name}}} es_result = raw_search(query, index="redelk-modules") if len(es_result["hits"]["hits"]) > 0: es_timestamp = get_value( "_source.module.last_run.timestamp", es_result["hits"]["hits"][0] ) es_date = datetime.datetime.strptime(es_timestamp, "%Y-%m-%dT%H:%M:%S.%f") return es_date return datetime.datetime.fromtimestamp(0) # pylint: disable=broad-except except Exception as error: logger.debug("Error parsing last run time: %s", error) return datetime.datetime.fromtimestamp(0) def module_did_run( module_name, module_type="unknown", status="unknown", message=None, count=0 ): """Returns true if the module already ran, false otherwise""" logger.debug( "Module did run: %s:%s [%s] %s", module_type, module_name, status, message ) try: now_ts = datetime.datetime.utcnow().isoformat() doc = { "module": { "name": module_name, "type": module_type, "last_run": {"timestamp": now_ts, "status": status, "count": count}, } } if message: doc["module"]["last_run"]["message"] = message es.index(index="redelk-modules", id=module_name, body=doc) return True # pylint: disable=broad-except except Exception as error: logger.error( "Error writting last run time for module %s: %s", module_name, os.path.join(config.TEMP_DIR, module_name), ) logger.exception(error) return False def module_should_run(module_name, module_type): # pylint: disable=too-many-branches """Check if the module is enabled and when is the last time the module ran. If the last time is before now - interval, the module will be allowed to run""" if module_type == "redelk_alarm": if module_name not in config.alarms: logger.warning( "Missing configuration for alarm [%s]. Will not run!", module_name ) return False if ( "enabled" in config.alarms[module_name] and not config.alarms[module_name]["enabled"] ): logger.warning( "Alarm module [%s] disabled in configuration file. Will not run!", module_name, ) return False if "interval" in config.alarms[module_name]: interval = config.alarms[module_name]["interval"] else: interval = 360 elif module_type == "redelk_enrich": if module_name not in config.enrich: logger.warning( "Missing configuration for enrichment module [%s]. Will not run!", module_name, ) return False if ( "enabled" in config.enrich[module_name] and not config.enrich[module_name]["enabled"] ): logger.warning( "Enrichment module [%s] disabled in configuration file. Will not run!", module_name, ) return False if "interval" in config.enrich[module_name]: interval = config.enrich[module_name]["interval"] else: interval = 360 else: logger.warning( "Invalid module type for shouldModuleRun(%s, %s)", module_name, module_type ) return False now = datetime.datetime.utcnow() last_run = get_last_run(module_name) interval = datetime.timedelta(seconds=interval) last_run_max = now - interval should_run = last_run < last_run_max if not should_run: logger.info( "Module [%s] already ran within the interval of %s seconds (%s)", module_name, interval, last_run.isoformat(), ) else: logger.info("All checks ok for module [%s]. Module should run.", module_name) logger.debug( "Last run: %s | Last run max: %s", last_run.isoformat(), last_run_max.isoformat(), ) return should_run def get_initial_alarm_result(): """Returns the initial_alarm_result object""" return copy.deepcopy(initial_alarm_result) initial_alarm_result = { "info": { "version": 0.0, "name": "unknown", "alarmmsg": "unkown", "description": "unknown", "type": "redelk_alarm", "submodule": "unknown", }, "hits": {"hits": [], "total": 0}, "mutations": {}, "fields": ["host.name", "user.name", "@timestamp", "c2.message"], "groupby": [], "status": "unknown", }