in elastic/security/parameter_sources/events_emitter.py [0:0]
def load_rules(track, params):
if "uri" not in params["rules"]:
raise ValueError("Required param 'rules.uri' is not configured")
if "path" not in params["rules"]:
raise ValueError("Required param 'rules.path' is not configured")
tags = set_to_lower(params["rules"].get("tags", []))
logger.info(f"Rule tags: {', '.join(sorted(tags)) or '<none>'}")
with resource(track, params["rules"]["uri"]) as resource_dir:
import pytoml
for filename in glob(os.path.join(resource_dir, "*", params["rules"]["path"]), recursive=True):
try:
with open(filename) as f:
rule = pytoml.load(f)["rule"]
except Exception as e:
logger.error(f"[{e}] while loading from [{filename}]")
continue
if rule["type"] not in ("eql", "query") or rule["language"] not in ("eql", "kuery"):
continue
if tags and not (tags & set_to_lower(rule.get("tags", []))):
continue
rule["index"] = [str(ds) for ds in track.data_streams for idx in rule["index"] if fnmatch(str(ds), idx)]
if not rule["index"]:
continue
rule["filename"] = filename
yield SimpleNamespace(**rule)