core/runner.py (107 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2025 Elasticsearch B.V.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from contextlib import ExitStack
from pathlib import Path
import typer
from typing_extensions import Annotated, List, Optional
from .util import fatal, get_node, set_node, setup_logging, warn_interactive
main = typer.Typer(pretty_exceptions_enable=False)
def parse_runtime_arguments(arguments):
import ast
args = {}
for arg in arguments:
name, *value = arg.split("=")
if not value:
set_node(args, name, None)
continue
value = value[0]
if not value:
set_node(args, name, None)
continue
try:
value = ast.literal_eval(value)
except Exception:
pass
set_node(args, name, value)
return args
@main.command()
def run(
config_file: typer.FileText,
dry_run: Annotated[bool, typer.Option()] = False,
log_level: Annotated[str, typer.Option(callback=setup_logging("INFO"))] = None,
arguments: Annotated[Optional[List[str]], typer.Option("--argument", "-a", help="Pass an argument to the Pipes runtime.")] = None,
):
"""
Run pipes
"""
import logging
from importlib import import_module
from . import Pipe, get_pipes
from .errors import Error
from .util import deserialize_yaml
logger = logging.getLogger("elastic.pipes.core")
try:
warn_interactive(config_file)
state = deserialize_yaml(config_file) or {}
except FileNotFoundError as e:
fatal(f"{e.strerror}: '{e.filename}'")
if not state:
fatal("invalid configuration, it's empty")
if config_file.name == "<stdin>":
base_dir = Path.cwd()
else:
base_dir = Path(config_file.name).parent
base_dir = str(base_dir.absolute())
if base_dir not in sys.path:
logger.debug(f"adding '{base_dir}' to the search path")
sys.path.append(base_dir)
state.setdefault("runtime", {}).update(
{
"base-dir": base_dir,
"in-memory-state": True,
}
)
if arguments:
state["runtime"].setdefault("arguments", {}).update(parse_runtime_arguments(arguments))
pipes = get_pipes(state)
if pipes:
name, config = pipes[0]
if name == "elastic.pipes":
for path in get_node(config, "search-path", None) or []:
path = str(Path(base_dir) / path)
if path not in sys.path:
logger.debug(f"adding '{path}' to the search path")
sys.path.append(path)
for name, config in pipes:
if name in Pipe.__pipes__:
continue
logger.debug(f"loading pipe '{name}'...")
try:
import_module(name)
except ModuleNotFoundError as e:
fatal(f"cannot load pipe '{name}': cannot find module: '{e.name}'")
if name not in Pipe.__pipes__:
fatal(f"module does not define a pipe: {name}")
with ExitStack() as stack:
for name, config in pipes:
pipe = Pipe.find(name)
try:
pipe.run(config, state, dry_run, logger, stack)
except Error as e:
pipe.logger.critical(e)
sys.exit(1)
@main.command()
def new_pipe(
pipe_file: Path,
force: Annotated[bool, typer.Option("--force", "-f")] = False,
):
"""
Create a new pipe module
"""
pipe_file = pipe_file.with_suffix(".py")
try:
with pipe_file.open("w" if force else "x") as f:
f.write(
f"""#!/usr/bin/env python3
from logging import Logger
from elastic.pipes.core import Pipe
from typing_extensions import Annotated
@Pipe("{pipe_file.stem}", default={{}}, notes="Use this example pipe as starting point for yours.")
def main(
log: Logger,
name: Annotated[str, Pipe.State("name"), Pipe.Help("to whom say hello")] = "world",
dry_run: bool = False,
):
\"\"\"Say hello to someone.\"\"\"
log.info(f"Hello, {{name}}!")
if __name__ == "__main__":
main()
"""
)
except FileExistsError as e:
fatal(f"{e.strerror}: '{e.filename}'")
# make it executable
mode = pipe_file.stat().st_mode
pipe_file.chmod(mode | 0o111)
@main.command()
def version():
"""
Print the version
"""
from ..core import __version__
print(__version__)