in elastic/security/parameter_sources/events_emitter.py [0:0]
def __init__(self, track, params, **kwargs):
from geneve import SourceEvents
schema = kwargs["_test_schema"] if "_test_schema" in kwargs else load_schema(track, params)
self.source_events = SourceEvents(schema)
self.index = params.get("index", None)
self.bulk_batch_size = params.get("bulk-batch-size", 100)
self.request_timeout = params.get("request-timeout", None)
self.number_of_alerts = params["number-of-alerts"]
index_stats = {}
if "rules" not in params and "queries" not in params:
raise ValueError("Either param 'rules' or 'queries' must be configured")
if "rules" in params:
for rule in load_rules(track, params):
index = self.index or rule.index[0]
try:
self.source_events.add_rule(rule, meta={"index": index})
except Exception as e:
logger.error(f"[{e}] while adding rule [{rule.filename}]")
continue
if index not in index_stats:
index_stats[index] = 1
else:
index_stats[index] += 1
if "queries" in params:
if not self.index:
raise ValueError("Param 'queries' requires param 'index' to be configured")
for query in params.get("queries", []):
try:
self.source_events.add_query(query, meta={"index": self.index})
except Exception as e:
logger.error(f"[{e}] while adding query [{query}]")
continue
if self.index not in index_stats:
index_stats[self.index] = 0
else:
index_stats[self.index] += 1
if not self.source_events:
raise ValueError("No valid rules or queries were loaded")
logger.info(f"Loaded {len(self.source_events)} roots")
for index in sorted(index_stats):
logger.info(f"Index {index}: {index_stats[index]}")