core/__init__.py (265 lines of code) (raw):
# Copyright 2025 Elasticsearch B.V.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Core definitions for creating Elastic Pipes components."""
import logging
import sys
from abc import ABC, abstractmethod
from collections import namedtuple
from collections.abc import Mapping, Sequence
from contextlib import ExitStack
from typing_extensions import Annotated, Any, NoDefault, get_args
from .errors import ConfigError, Error
from .util import get_node, has_node, is_mutable, set_node
__version__ = "0.7.0-dev"
def _indirect(node):
return node + "@"
def validate_logging_config(name, config):
if level := get_node(config, "logging.level", None):
level_nr = getattr(logging, level.upper(), None)
if not isinstance(level_nr, int):
raise ConfigError(f"invalid configuration: pipe '{name}': node 'logging.level': value '{level}'")
def get_pipes(state):
if state is None:
state = {}
if not isinstance(state, Mapping):
raise ConfigError(f"invalid state: not a mapping: {state} ({type(state).__name__})")
pipes = state.get("pipes", [])
if pipes is None:
pipes = []
if not isinstance(pipes, Sequence):
raise ConfigError(f"invalid pipes configuration: not a sequence: {pipes} ({type(pipes).__name__})")
configs = []
for pipe in pipes:
if not isinstance(pipe, Mapping):
raise ConfigError(f"invalid pipe configuration: not a mapping: {pipe} ({type(pipe).__name__})")
if len(pipe) != 1:
raise ConfigError(f"invalid pipe configuration: multiple pipe names: {', '.join(pipe)}")
name = set(pipe).pop()
config = pipe.get(name)
if config is None:
config = {}
if not isinstance(config, Mapping):
raise ConfigError(f"invalid pipe configuration: not a mapping: {config} ({type(config).__name__})")
validate_logging_config(name, config)
configs.append((name, config))
return configs
class Pipe:
__pipes__ = {}
def __init__(self, name, *, default=sys.exit, notes=None, closing_notes=None):
self.func = None
self.name = name
self.notes = notes
self.closing_notes = closing_notes
self.default = default
self.logger = logging.getLogger(name)
self.logger.propagate = False
def __call__(self, func):
from functools import partial
from .standalone import run
if self.name in self.__pipes__:
module = self.__pipes__[self.name].func.__module__
raise ConfigError(f"pipe '{self.name}' is already defined in module '{module}'")
self.__pipes__[self.name] = self
self.func = func
return partial(run, self)
@classmethod
def find(cls, name):
return cls.__pipes__[name]
def run(self, config, state, dry_run, logger, exit_stack):
from inspect import signature
sync_logger_config(self.logger, config)
params = signature(self.func).parameters
if not dry_run:
logger.debug(f"executing pipe '{self.name}'...")
elif "dry_run" in params:
logger.debug(f"dry executing pipe '{self.name}'...")
else:
logger.debug(f"not executing pipe '{self.name}'...")
with ExitStack() as stack:
kwargs = {}
for name, param in params.items():
if name == "dry_run":
kwargs["dry_run"] = dry_run
continue
if isinstance(param.annotation, type):
if issubclass(param.annotation, Pipe):
kwargs[name] = self
elif issubclass(param.annotation, logging.Logger):
kwargs[name] = self.logger
elif issubclass(param.annotation, ExitStack):
kwargs[name] = exit_stack
elif issubclass(param.annotation, Pipe.Context):
kwargs[name] = param.annotation.bind(stack, config, state, logger)
continue
args = get_args(param.annotation)
for ann in args:
if isinstance(ann, Pipe.Node):
param = Pipe.Node.Param(name, args[0], param.default, param.empty)
_, getter, _ = ann.handle_param(param, config, state, logger)
try:
kwargs[name] = getter(None)
except KeyError as e:
raise Error(e.args[0])
if not dry_run or "dry_run" in kwargs:
return self.func(**kwargs)
class Help:
def __init__(self, help):
self.help = help
class Notes:
def __init__(self, notes):
self.notes = notes
class Context:
def __enter__(self):
return self
def __exit__(self, *_):
pass
@classmethod
def bind(cls, stack, config, state, logger):
# define a new sub-type of the user's context, make it concrete or it cannot be instantiated
sub = type(cls.__name__, (cls,), {"handle_param": None, "logger": logger})
bindings = {}
for name, ann in cls.__annotations__.items():
if isinstance(ann, type):
if issubclass(ann, Pipe.Context):
nested = ann.bind(stack, config, state, logger)
setattr(sub, name, nested)
continue
args = get_args(ann)
for i, ann in enumerate(args):
if isinstance(ann, Pipe.Node):
default = getattr(cls, name, NoDefault)
param = Pipe.Node.Param(name, args[0], default, NoDefault)
binding, getter, setter = ann.handle_param(param, config, state, logger)
setattr(sub, name, property(getter, setter))
bindings[name] = binding
try:
getter(None)
except KeyError as e:
raise Error(e.args[0])
setattr(sub, "__pipe_ctx_bindings__", bindings)
return stack.enter_context(sub())
@classmethod
def get_binding(cls, name):
return cls.__pipe_ctx_bindings__.get(name)
class Node(ABC):
Param = namedtuple("Param", ["name", "type", "default", "empty"])
class Binding:
node: str
root: dict
root_name: str
def __init__(self, node):
self.node = node
@abstractmethod
def handle_param(self, param, config, state, logger):
pass
class Config(Node):
def handle_param(self, param, config, state, logger):
if param.default is not param.empty and is_mutable(param.default):
raise TypeError(f"param '{param.name}': mutable default not allowed: {param.default}")
has_value = has_node(config, self.node)
has_indirect = has_node(config, _indirect(self.node))
if has_value and has_indirect:
raise ConfigError(f"param '{param.name}': config cannot specify both '{self.node}' and '{_indirect(self.node)}'")
binding = Pipe.Node.Binding()
if has_indirect:
binding.node = get_node(config, _indirect(self.node))
binding.root = state
binding.root_name = "state"
else:
binding.node = self.node
binding.root = config
binding.root_name = "config"
logger.debug(f" bind param '{param.name}' to {binding.root_name} node '{binding.node}'")
def default_action():
if param.default is param.empty:
raise KeyError(f"param '{param.name}': {binding.root_name} node not found: '{binding.node}'")
return param.default
def getter(_):
value = get_node(binding.root, binding.node, default_action=default_action)
if value is None or param.type is Any or isinstance(value, param.type):
return value
value_type = type(value).__name__
expected_type = param.type.__name__
raise Error(
f"param '{param.name}': {binding.root_name} node '{binding.node}' type mismatch: '{value_type}' (expected '{expected_type}')"
)
def setter(_, value):
if binding.node != self.node or binding.root is not config or binding.root_name != "config":
binding.node = self.node
binding.root = config
binding.root_name = "config"
logger.debug(f" re-bind param '{param.name}' to {binding.root_name} node '{binding.node}'")
config.pop(_indirect(self.node))
set_node(binding.root, binding.node, value)
return binding, getter, setter
class State(Node):
def __init__(self, node, *, indirect=True, mutable=False):
super().__init__(node)
self.indirect = indirect
self.mutable = mutable
if node is None and not isinstance(indirect, str):
self.indirect = False
def handle_param(self, param, config, state, logger):
if param.default is not param.empty and is_mutable(param.default):
raise TypeError(f"param '{param.name}': mutable default not allowed: {param.default}")
if self.indirect:
indirect = _indirect(self.node if self.indirect is True else self.indirect)
has_indirect = has_node(config, indirect)
else:
has_indirect = False
node = get_node(config, indirect) if has_indirect else self.node
if node is None:
logger.debug(f" bind param '{param.name}' to the whole state")
else:
logger.debug(f" bind param '{param.name}' to state node '{node}'")
binding = Pipe.Node.Binding()
binding.node = node
binding.root = state
binding.root_name = "state"
def default_action():
if param.default is param.empty:
raise KeyError(f"param '{param.name}': {binding.root_name} node not found: '{binding.node}'")
return param.default
def getter(_):
value = get_node(binding.root, binding.node, default_action=default_action)
if value is not None and is_mutable(value) and not self.mutable:
raise AttributeError(f"param '{param.name}' is mutable but not marked as such")
if value is None or param.type is Any or isinstance(value, param.type):
return value
value_type = type(value).__name__
expected_type = param.type.__name__
raise Error(
f"param '{param.name}': {binding.root_name} node '{binding.node}' type mismatch: '{value_type}' (expected '{expected_type}')"
)
def setter(_, value):
if not self.mutable:
raise AttributeError(f"param '{param.name}' is not mutable")
if binding.node != node or binding.root is not state or binding.root_name != "state":
binding.node = node
binding.root = state
binding.root_name = "state"
logger.debug(f" re-bind param '{param.name}' to {binding.root_name} node '{binding.node}'")
set_node(binding.root, binding.node, value)
return binding, getter, setter
def sync_logger_config(logger, config):
elastic_pipes_logger = logging.getLogger("elastic.pipes")
if logger == elastic_pipes_logger:
return
for handler in reversed(logger.handlers):
logger.removeHandler(handler)
for handler in elastic_pipes_logger.handlers:
logger.addHandler(handler)
level = get_node(config, "logging.level", None)
if level is None or getattr(elastic_pipes_logger, "overridden", False):
logger.setLevel(elastic_pipes_logger.level)
else:
logger.setLevel(level.upper())
@Pipe("elastic.pipes")
def _elastic_pipes(
log: logging.Logger,
level: Annotated[str, Pipe.Config("logging.level")] = None,
min_version: Annotated[str, Pipe.Config("minimum-version")] = None,
dry_run: bool = False,
):
if level is not None and not getattr(log, "overridden", False):
log.setLevel(level.upper())
if min_version is not None:
from semver import VersionInfo
if VersionInfo.parse(__version__) < VersionInfo.parse(min_version):
raise ConfigError(f"current version is older than minimum version: {__version__} < {min_version}")