def load_rules()

in elastic/security/parameter_sources/events_emitter.py [0:0]


def load_rules(track, params):
    if "uri" not in params["rules"]:
        raise ValueError("Required param 'rules.uri' is not configured")
    if "path" not in params["rules"]:
        raise ValueError("Required param 'rules.path' is not configured")

    tags = set_to_lower(params["rules"].get("tags", []))
    logger.info(f"Rule tags: {', '.join(sorted(tags)) or '<none>'}")

    with resource(track, params["rules"]["uri"]) as resource_dir:
        import pytoml

        for filename in glob(os.path.join(resource_dir, "*", params["rules"]["path"]), recursive=True):
            try:
                with open(filename) as f:
                    rule = pytoml.load(f)["rule"]
            except Exception as e:
                logger.error(f"[{e}] while loading from [{filename}]")
                continue

            if rule["type"] not in ("eql", "query") or rule["language"] not in ("eql", "kuery"):
                continue
            if tags and not (tags & set_to_lower(rule.get("tags", []))):
                continue
            rule["index"] = [str(ds) for ds in track.data_streams for idx in rule["index"] if fnmatch(str(ds), idx)]
            if not rule["index"]:
                continue

            rule["filename"] = filename
            yield SimpleNamespace(**rule)