elastic/security/parameter_sources/events_emitter.py (134 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
import shutil
from contextlib import contextmanager
from fnmatch import fnmatch
from glob import glob
from itertools import chain, islice
from tempfile import mkdtemp
from types import SimpleNamespace
from urllib.parse import urlparse
import esrally
logger = logging.getLogger(__name__)
def set_to_lower(iterable):
return set(x.lower() for x in iterable)
@contextmanager
def resource(track, uri):
tmpdir = mkdtemp()
uri_parts = urlparse(uri)
if uri_parts.scheme.startswith("http"):
uri_file = uri_parts.path.split("/")[-1]
local_file = os.path.join(tmpdir, uri_file)
esrally.utils.net.download_http(uri, local_file)
elif uri_parts.scheme == "file":
if uri_parts.netloc == ".":
local_file = os.path.join(track.root, "." + uri_parts.path)
else:
local_file = uri_parts.path
else:
raise ValueError(f"uri scheme not supported: {uri_parts.scheme}")
shutil.unpack_archive(local_file, tmpdir)
try:
yield tmpdir
finally:
shutil.rmtree(tmpdir)
def load_schema(track, params):
if "schema" not in params:
raise ValueError("Required param 'schema' is not configured")
if "uri" not in params["schema"]:
raise ValueError("Required param 'schema.uri' is not configured")
if "path" not in params["schema"]:
raise ValueError("Required param 'schema.path' is not configured")
with resource(track, params["schema"]["uri"]) as resource_dir:
files = glob(os.path.join(resource_dir, "*", params["schema"]["path"]), recursive=True)
if len(files) < 1:
raise ValueError(f"File not found in '{resource_dir}': '{params['schema']['path']}'")
if len(files) > 1:
raise ValueError(f"Too many files: {files}")
logger.info(f"schema: {files[0]}")
with open(files[0]) as f:
import yaml
return yaml.safe_load(f)
def load_rules(track, params):
if "uri" not in params["rules"]:
raise ValueError("Required param 'rules.uri' is not configured")
if "path" not in params["rules"]:
raise ValueError("Required param 'rules.path' is not configured")
tags = set_to_lower(params["rules"].get("tags", []))
logger.info(f"Rule tags: {', '.join(sorted(tags)) or '<none>'}")
with resource(track, params["rules"]["uri"]) as resource_dir:
import pytoml
for filename in glob(os.path.join(resource_dir, "*", params["rules"]["path"]), recursive=True):
try:
with open(filename) as f:
rule = pytoml.load(f)["rule"]
except Exception as e:
logger.error(f"[{e}] while loading from [{filename}]")
continue
if rule["type"] not in ("eql", "query") or rule["language"] not in ("eql", "kuery"):
continue
if tags and not (tags & set_to_lower(rule.get("tags", []))):
continue
rule["index"] = [str(ds) for ds in track.data_streams for idx in rule["index"] if fnmatch(str(ds), idx)]
if not rule["index"]:
continue
rule["filename"] = filename
yield SimpleNamespace(**rule)
def batch_sizes(total_count, batch_size):
while total_count:
this_batch_size = min(total_count, batch_size)
total_count -= this_batch_size
yield this_batch_size
def batches(iterable, total_count, batch_size):
for this_batch_size in batch_sizes(total_count, batch_size):
yield chain(*islice(iterable, this_batch_size))
class EventsEmitterParamSource:
def __init__(self, track, params, **kwargs):
from geneve import SourceEvents
schema = kwargs["_test_schema"] if "_test_schema" in kwargs else load_schema(track, params)
self.source_events = SourceEvents(schema)
self.index = params.get("index", None)
self.bulk_batch_size = params.get("bulk-batch-size", 100)
self.request_timeout = params.get("request-timeout", None)
self.number_of_alerts = params["number-of-alerts"]
index_stats = {}
if "rules" not in params and "queries" not in params:
raise ValueError("Either param 'rules' or 'queries' must be configured")
if "rules" in params:
for rule in load_rules(track, params):
index = self.index or rule.index[0]
try:
self.source_events.add_rule(rule, meta={"index": index})
except Exception as e:
logger.error(f"[{e}] while adding rule [{rule.filename}]")
continue
if index not in index_stats:
index_stats[index] = 1
else:
index_stats[index] += 1
if "queries" in params:
if not self.index:
raise ValueError("Param 'queries' requires param 'index' to be configured")
for query in params.get("queries", []):
try:
self.source_events.add_query(query, meta={"index": self.index})
except Exception as e:
logger.error(f"[{e}] while adding query [{query}]")
continue
if self.index not in index_stats:
index_stats[self.index] = 0
else:
index_stats[self.index] += 1
if not self.source_events:
raise ValueError("No valid rules or queries were loaded")
logger.info(f"Loaded {len(self.source_events)} roots")
for index in sorted(index_stats):
logger.info(f"Index {index}: {index_stats[index]}")
def partition(self, partition_index, total_partitions):
return self
def params(self):
doc_batches = batches(self.source_events, self.number_of_alerts, self.bulk_batch_size)
return {
"doc-batches": doc_batches,
"request-timeout": self.request_timeout,
}