elkserver/docker/redelk-base/redelkinstalldata/scripts/modules/alarm_httptraffic/module.py (98 lines of code) (raw):

#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Part of RedELK This check queries for IP's that aren't listed in any iplist* but do talk to c2* paths on redirectors Authors: - Outflank B.V. / Mark Bergman (@xychix) - Lorenzo Bernardi (@fastlorenzo) """ import logging from modules.helpers import get_initial_alarm_result, get_value, raw_search info = { "version": 0.1, "name": "HTTP Traffic module", "alarmmsg": "UNKNOWN IP TO C2_ backend", "description": "This check queries for IP's that aren't listed in any iplist* but do talk to c2* paths on redirectors", "type": "redelk_alarm", # Could also contain redelk_enrich if it was an enrichment module "submodule": "alarm_httptraffic", } class Module: """HTTP Traffic module""" def __init__(self): self.logger = logging.getLogger(info["submodule"]) def run(self): """Run the alarm module""" ret = get_initial_alarm_result() ret["info"] = info ret["fields"] = [ "agent.hostname", "source.ip", "source.cdn.ip", "source.geo.country_name", "source.as.organization.name", "redir.frontend.name", "redir.backend.name", "infra.attack_scenario", "tags", "redir.timestamp", ] ret["groupby"] = ["source.ip"] alarmed_ips = self.get_alarmed_ips() report = self.alarm_check(alarmed_ips) ret["hits"]["hits"] = report ret["hits"]["total"] = len(report) self.logger.info( "finished running module. result: %s hits", ret["hits"]["total"] ) return ret def get_alarmed_ips(self): # pylint: disable=no-self-use """Returns all previous IPs that have been alarmed already""" es_query = { "sort": [{"@timestamp": {"order": "desc"}}], "query": { "bool": { "filter": [ {"range": {"@timestamp": {"gte": "now-1y"}}}, {"match": {"tags": info["submodule"]}}, ] } }, } res = raw_search(es_query, index="redirtraffic-*") if res is None: alarmed_hits = [] else: alarmed_hits = res["hits"]["hits"] # Created a dict grouped by IP address (from source.ip) ips = {} for alarmed_hit in alarmed_hits: # pylint: disable=invalid-name ip = get_value("_source.source.ip", alarmed_hit) if ip in ips: ips[ip].append(alarmed_hit) else: ips[ip] = [alarmed_hit] return ips def alarm_check(self, alarmed_ips): # pylint: disable=no-self-use """This check queries for IP's that aren't listed in any iplist* but do talk to c2* paths on redirectors""" es_query = { "sort": [{"@timestamp": {"order": "desc"}}], "query": { "bool": { "filter": [{"match": {"tags": "enrich_iplists"}}], "must": { "query_string": { "fields": ["redir.backend.name"], "query": "c2*", } }, "must_not": [ {"query_string": {"fields": ["tags"], "query": "iplist_*"}}, {"match": {"tags": info["submodule"]}}, ], } }, } res = raw_search(es_query, index="redirtraffic-*") if res is None: not_enriched_hits = [] else: not_enriched_hits = res["hits"]["hits"] # Created a dict grouped by IP address (from source.ip) ips = {} for not_enriched in not_enriched_hits: # pylint: disable=invalid-name ip = get_value("_source.source.ip", not_enriched) if ip in ips: ips[ip].append(not_enriched) else: ips[ip] = [not_enriched] hits = [] # Now we check if the IPs have already been alarmed in the past timeframe defined in the config # pylint: disable=invalid-name for ip, ip_val in ips.items(): # Not alarmed yet, process it if ip not in alarmed_ips: hits += ip_val # Return the array of new IP documents to be alarmed return hits