share/include_exlude.py (61 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. from __future__ import annotations import re from typing import Optional class IncludeExcludeRule: """ IncludeExcludeRule represents a pattern rule """ def __init__(self, pattern: str): self.pattern = re.compile(pattern) def __eq__(self, other: object) -> bool: assert isinstance(other, IncludeExcludeRule) return self.pattern == other.pattern class IncludeExcludeFilter: """ Base class for IncludeExclude filter """ def __init__( self, include_patterns: Optional[list[IncludeExcludeRule]] = None, exclude_patterns: Optional[list[IncludeExcludeRule]] = None, ): self._include_rules: Optional[list[IncludeExcludeRule]] = None self._exclude_rules: Optional[list[IncludeExcludeRule]] = None if include_patterns is not None and len(include_patterns) > 0: self.include_rules = include_patterns if exclude_patterns is not None and len(exclude_patterns) > 0: self.exclude_rules = exclude_patterns self._always_yield = self._include_rules is None and self._exclude_rules is None self._include_only = self._include_rules is not None and self._exclude_rules is None self._exclude_only = self._exclude_rules is not None and self._include_rules is None def _is_included(self, message: str) -> bool: assert self._include_rules is not None for include_rule in self._include_rules: if include_rule.pattern.search(message) is not None: return True return False def _is_excluded(self, message: str) -> bool: assert self._exclude_rules is not None for exclude_rule in self._exclude_rules: if exclude_rule.pattern.search(message) is not None: return True return False def filter(self, message: str) -> bool: """ filter returns True if the event is included or not excluded """ if self._always_yield: return True if self._include_only: return self._is_included(message) if self._exclude_only: return not self._is_excluded(message) if self._is_excluded(message): return False return self._is_included(message) def __eq__(self, other: object) -> bool: assert isinstance(other, IncludeExcludeFilter) return self.include_rules == other.include_rules and self.exclude_rules == other.exclude_rules @property def include_rules(self) -> Optional[list[IncludeExcludeRule]]: return self._include_rules @include_rules.setter def include_rules(self, value: list[IncludeExcludeRule]) -> None: self._include_rules = value @property def exclude_rules(self) -> Optional[list[IncludeExcludeRule]]: return self._exclude_rules @exclude_rules.setter def exclude_rules(self, value: list[IncludeExcludeRule]) -> None: self._exclude_rules = value