elkserver/docker/redelk-base/redelkinstalldata/scripts/modules/enrich_domainscategorization/module.py (150 lines of code) (raw):
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Part of RedELK
This script enriches domains lists with categorization data
Authors:
- Lorenzo Bernardi (@fastlorenzo)
"""
import datetime
import logging
import copy
# from modules.enrich_domainscategorization.cat_bluecoat import Bluecoat
from modules.enrich_domainscategorization.cat_ibmxforce import IBMXForce
from modules.enrich_domainscategorization.cat_mcafee import MCafee
from modules.enrich_domainscategorization.cat_vt import VT
from modules.helpers import (
get_initial_alarm_result,
get_value,
raw_search,
es,
)
info = {
"version": 0.1,
"name": "Enrich domains lists with categorization data",
"alarmmsg": "",
"description": "This script enriches domains lists with categorization data",
"type": "redelk_enrich",
"submodule": "enrich_domainscategorization",
}
class Module:
"""Enrich domains lines with data from domains lists"""
def __init__(self):
self.logger = logging.getLogger(info["submodule"])
self.now = datetime.datetime.utcnow()
self.enabled_engines = ["vt", "ibmxforce", "mcafee"]
def run(self):
"""run the enrich module"""
ret = get_initial_alarm_result()
ret["info"] = info
self.now = datetime.datetime.utcnow()
# 1. get all IPs from the different IP lists (except tor)
domains = self.get_domains()
self.logger.debug("Domains: %s", domains)
# 2. Check all domains
checked_domains = self.check_domains(domains)
self.logger.debug("Checked domains: %s", checked_domains)
# 3. loop through each result and update the categorization data
# res = self.update_traffic(domains_lists)
self.update_categorization_data(domains, checked_domains)
# 4. Return all hits so they can be tagged
ret["hits"]["hits"] = []
ret["hits"]["total"] = []
self.logger.info(
"finished running module. result: %s hits", ret["hits"]["total"]
)
return ret
def get_domains(self):
"""Get all domains from the different domains lists"""
domains = {}
# Get all IPs except from tor
# es_query = {'query': {'bool': {'must_not': [{'match': {'domainslist.name': 'tor'}}]}}}
es_query = {}
es_results = raw_search(es_query, index="redelk-domainslist-*")
if not es_results:
return domains
for domain_doc in es_results["hits"]["hits"]:
domain = get_value("_source.domainslist.domain", domain_doc)
domains[domain] = domain_doc
return domains
def check_domains(self, domains):
"""Check the domains categorization"""
# bluecoat = Bluecoat()
ibmxforce = IBMXForce()
mcafee = MCafee()
vt = VT() # pylint: disable=invalid-name
checked_domains = {}
for domain in domains:
checked_domains[domain] = {
"categorization": {
"engines": {},
"categories": [],
"categories_str": "",
}
}
# Loop through all enabled engines and check the domain
for engine in self.enabled_engines:
try:
self.logger.debug("Checking %s with %s", domain, engine)
if engine == "vt":
result = copy.deepcopy(vt.check_domain(domain))
elif engine == "ibmxforce":
result = copy.deepcopy(ibmxforce.check_domain(domain))
elif engine == "mcafee":
result = copy.deepcopy(mcafee.check_domain(domain))
# elif engine == "bluecoat":
# result = copy.deepcopy(bluecoat.check_domain(domain))
else:
self.logger.error("Unknown engine: %s", engine)
except Exception as err: # pylint: disable=broad-except
self.logger.error(
"Error checking domain %s with %s: %s", domain, engine, err
)
result = {
"categories": [],
"extra_data": {},
}
checked_domains[domain]["categorization"]["engines"][engine] = {
"categories": result["categories"],
"extra_data": result["extra_data"],
}
checked_domains[domain]["categorization"]["categories"].extend(
result["categories"]
)
checked_domains[domain]["categorization"][
"categories_str"
] += f"{engine}={','.join(result['categories'])}"
return checked_domains
def update_categorization_data(self, domains, checked_domains):
"""Update the categorization data for each domain"""
for domain in domains:
self.logger.debug("Updating categorization data for %s", domain)
# Check if current categorization data is different from the new one
new_categories = []
new_categories = get_value(
"categorization.categories_str", checked_domains[domain], ""
)
old_categories = get_value(
"_source.domainslist.categorization.categories_str", domains[domain], ""
)
self.logger.debug("New categories: %s", new_categories)
self.logger.debug("Old categories: %s", old_categories)
# Update the categorization data if needed
if new_categories != old_categories:
self.logger.debug(
"Updating categorization data for %s with %s",
domain,
new_categories,
)
# Get old categorization data to add in bluecheck
try:
old_categorization = copy.deepcopy(
domains[domain]["_source"]["domainslist"]["categorization"]
)
except Exception as err: # pylint: disable=broad-except
self.logger.error(
"Error getting old categorization data for %s: %s", domain, err
)
old_categorization = {
"categories_str": get_value(
"_source.domainslist.categorization.categories_str",
domains[domain],
"",
),
"categories": get_value(
"_source.domainslist.categorization.categories",
domains[domain],
[],
),
}
domains[domain]["_source"]["domainslist"][
"categorization"
] = checked_domains[domain]["categorization"]
es.update(
index=domains[domain]["_index"],
id=domains[domain]["_id"],
body={"doc": domains[domain]["_source"]},
)
self.add_bluecheck_entry(domains[domain], old_categorization)
def add_bluecheck_entry(self, domain, old_categorization):
"""Add an entry to the bluecheck index"""
data = domain["_source"]
self.logger.debug(
"Adding bluecheck entry with data: %s [old:%s]", data, old_categorization
)
data["domainslist"]["categorization"]["old"] = old_categorization
now = datetime.datetime.utcnow()
doc_id = f"{domain['_id']}-{now.timestamp()}"
# Add checked_at field
data["@timestamp"] = now.isoformat()
# Create the document in bluecheck index
es.create(index="bluecheck-domains", body=data, id=doc_id)