elkserver/docker/redelk-base/redelkinstalldata/scripts/modules/enrich_tor/module.py (126 lines of code) (raw):
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Part of RedELK
This script enriches redirtraffic documents with data from tor exit nodes
Authors:
- Outflank B.V. / Mark Bergman (@xychix)
- Lorenzo Bernardi (@fastlorenzo)
"""
import logging
import datetime
import requests
from elasticsearch import helpers
from modules.helpers import (
get_initial_alarm_result,
es,
get_value,
raw_search,
get_last_run,
)
from config import enrich
info = {
"version": 0.1,
"name": "Enrich redirtraffic lines with tor exit nodes",
"alarmmsg": "",
"description": "This script enriches redirtraffic documents with data from tor exit nodes",
"type": "redelk_enrich",
"submodule": "enrich_tor",
}
class Module:
"""This script enriches redirtraffic documents with data from tor exit nodes"""
def __init__(self):
self.logger = logging.getLogger(info["submodule"])
self.tor_exitlist_url = "https://check.torproject.org/torbulkexitlist"
# Re-query after 1 hour by default
self.cache = (
enrich[info["submodule"]]["cache"] if info["submodule"] in enrich else 3600
)
def run(self):
"""run the module"""
ret = get_initial_alarm_result()
ret["info"] = info
# First check the last sync time
now = datetime.datetime.utcnow()
last_sync = self.get_last_sync()
ival = datetime.timedelta(seconds=self.cache)
last_sync_max = now - ival
should_sync = last_sync < last_sync_max
if should_sync:
self.logger.info(
"Tor cache expired, fetching latest exit nodes list. Will skip enrichment (will be run next time)"
)
iplist = self.sync_tor_exitnodes()
else:
iplist = self.get_es_tor_exitnodes()
if iplist:
hits = self.enrich_tor(iplist)
ret["hits"]["hits"] = hits
ret["hits"]["total"] = len(hits)
self.logger.info(
"finished running module. result: %s hits", ret["hits"]["total"]
)
return ret
def sync_tor_exitnodes(self):
"""Sync the tor exit nodes with the iplists"""
try:
# 1. Get tor exit nodes
response = requests.get(self.tor_exitlist_url)
iplist_tor = response.text.split("\n")
iplist_es = []
for ip in iplist_tor: # pylint: disable=invalid-name
if ip != "":
iplist_es.append(f"{ip}/32")
# 2. Delete existing nodes
es.delete_by_query(
index="redelk-*",
body={"query": {"bool": {"filter": {"term": {"iplist.name": "tor"}}}}},
)
# 3. Add new data (index=l['_index'], id=l['_id'], body={'doc': l['_source']})
now = datetime.datetime.utcnow().isoformat()
iplist_doc = [
{
"_source": {
"iplist": {"ip": ip, "source": "enrich", "name": "tor"},
"@timestamp": now,
}
}
for ip in iplist_es
]
helpers.bulk(es, iplist_doc, index="redelk-iplist-tor")
self.logger.info("Successfuly updated iplist tor exit nodes")
return iplist_tor
except Exception as error: # pylint: disable=broad-except
self.logger.error("Failed updating iplist tor exit nodes: %s", error)
self.logger.exception(error)
return False
def enrich_tor(self, iplist): # pylint:disable=no-self-use
"""Get all lines in redirtraffic that have not been enriched with 'enrich_iplist' or 'enrich_tor'
Filter documents that were before the last run time of enrich_iplist (to avoid race condition)"""
iplist_lastrun = get_last_run("enrich_iplists")
query = {
"sort": [{"@timestamp": {"order": "desc"}}],
"query": {
"bool": {
"filter": [
{"range": {"@timestamp": {"lte": iplist_lastrun.isoformat()}}}
],
"must_not": [{"match": {"tags": info["submodule"]}}],
}
},
}
res = raw_search(query, index="redirtraffic-*")
if res is None:
not_enriched = []
else:
not_enriched = res["hits"]["hits"]
# For each IP, check if it is in tor exit node data
hits = []
for not_e in not_enriched:
ip = get_value("_source.source.ip", not_e) # pylint: disable=invalid-name
if ip in iplist:
hits.append(not_e)
return hits
def get_es_tor_exitnodes(self): # pylint:disable=no-self-use
"""get the tor exit nodes present in ES"""
es_query = {"query": {"bool": {"filter": {"term": {"iplist.name": "tor"}}}}}
es_result = raw_search(es_query, index="redelk-*")
if not es_result:
return []
iplist = []
for ipdoc in es_result["hits"]["hits"]:
ip = get_value("_source.iplist.ip", ipdoc) # pylint: disable=invalid-name
iplist.append(ip)
return iplist
def get_last_sync(self):
"""Get greynoise data from ES if less than 1 day old"""
es_query = {
"size": 1,
"sort": [{"@timestamp": {"order": "desc"}}],
"query": {"bool": {"filter": [{"term": {"iplist.name": "tor"}}]}},
}
es_results = raw_search(es_query, index="redelk-*")
self.logger.debug(es_results)
# Return the latest hit or False if not found
if es_results and len(es_results["hits"]["hits"]) > 0:
dt_str = get_value("_source.@timestamp", es_results["hits"]["hits"][0])
dtime = datetime.datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%S.%f")
return dtime
return datetime.datetime.fromtimestamp(0)