def __init__()

in elastic/security/parameter_sources/events_emitter.py [0:0]


    def __init__(self, track, params, **kwargs):
        from geneve import SourceEvents

        schema = kwargs["_test_schema"] if "_test_schema" in kwargs else load_schema(track, params)
        self.source_events = SourceEvents(schema)
        self.index = params.get("index", None)
        self.bulk_batch_size = params.get("bulk-batch-size", 100)
        self.request_timeout = params.get("request-timeout", None)
        self.number_of_alerts = params["number-of-alerts"]
        index_stats = {}

        if "rules" not in params and "queries" not in params:
            raise ValueError("Either param 'rules' or 'queries' must be configured")

        if "rules" in params:
            for rule in load_rules(track, params):
                index = self.index or rule.index[0]

                try:
                    self.source_events.add_rule(rule, meta={"index": index})
                except Exception as e:
                    logger.error(f"[{e}] while adding rule [{rule.filename}]")
                    continue

                if index not in index_stats:
                    index_stats[index] = 1
                else:
                    index_stats[index] += 1

        if "queries" in params:
            if not self.index:
                raise ValueError("Param 'queries' requires param 'index' to be configured")

            for query in params.get("queries", []):
                try:
                    self.source_events.add_query(query, meta={"index": self.index})
                except Exception as e:
                    logger.error(f"[{e}] while adding query [{query}]")
                    continue

                if self.index not in index_stats:
                    index_stats[self.index] = 0
                else:
                    index_stats[self.index] += 1

        if not self.source_events:
            raise ValueError("No valid rules or queries were loaded")

        logger.info(f"Loaded {len(self.source_events)} roots")
        for index in sorted(index_stats):
            logger.info(f"Index {index}: {index_stats[index]}")