core/standalone.py (130 lines of code) (raw):
# Copyright 2025 Elasticsearch B.V.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Core definitions for stand alone pipes invocation."""
import logging
import sys
from contextlib import ExitStack
from . import get_pipes
from .errors import Error
from .util import (
deserialize_yaml,
fatal,
serialize_yaml,
setup_logging,
warn_interactive,
)
def receive_state_from_unix_pipe(logger, default):
logger.debug("awaiting state from standard input")
warn_interactive(sys.stdin)
state = deserialize_yaml(sys.stdin)
if state:
logger.debug("got state")
elif default is sys.exit:
logger.debug("no state, exiting")
sys.exit(1)
else:
logger.debug("using default state")
state = default
return state
def send_state_to_unix_pipe(logger, state):
logger.debug("relaying state to standard output")
serialize_yaml(sys.stdout, state)
def help_message(pipe):
from functools import partial
from rich import print
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from . import Pipe, _indirect
from .util import walk_contexts, walk_params
pipe_doc = pipe.func.__doc__
if not pipe_doc:
pipe_doc = "[i]This pipe has no description.[/i]"
config_entries = []
state_entries = []
for node, help, notes, type, default, empty in walk_params(pipe):
help = help or ""
if isinstance(node, Pipe.Config):
if notes is None:
notes = "" if default is empty else f"default: {repr(default)}"
config_entries.append([node.node, type.__name__, help, notes])
if isinstance(node, Pipe.State):
if node.node and node.node.startswith("runtime."):
continue
if node.node is not None:
if notes is None:
notes = "" if default is empty else f"default: {repr(default)}"
state_entries.append([node.node, type.__name__, help, notes])
elif node.indirect:
if notes is None:
notes = f"default: {repr(node.node)}"
config_entries.append([_indirect(node.indirect), type.__name__, help, notes])
notes = []
if pipe.notes:
notes += pipe.notes if isinstance(pipe.notes, list) else [str(pipe.notes)]
for ctx in walk_contexts(pipe):
ctx_notes = getattr(ctx, "notes", None) or []
notes += ctx_notes if isinstance(ctx_notes, list) else [str(ctx_notes)]
if pipe.closing_notes:
notes += pipe.closing_notes if isinstance(pipe.closing_notes, list) else [str(pipe.closing_notes)]
def _render_panel(title, entries):
table = Table(show_header=False, box=None, expand=False)
for entry in sorted(entries):
table.add_row(
Text(entry[0], style="bold cyan"),
Text(entry[1], style="bold yellow"),
entry[2],
Text(entry[3], style="dim"),
)
if not entries:
table.add_row("[i]none[/i]")
return Panel(table, title=title, title_align="left", border_style="dim")
def _render_notes(notes):
table = Table(show_header=False, box=None, expand=False)
for note in notes:
table.add_row(
Text("*", style="bold green"),
note,
)
return Panel(table, title="Notes", title_align="left", border_style="dim")
# print everything on standard error
print = partial(print, file=sys.stderr)
print(pipe_doc)
print()
print(_render_panel("Configuration parameters", config_entries))
print(_render_panel("State nodes", state_entries))
if notes:
print(_render_notes(notes))
print()
print("Use the [bold green]-p[/bold green] option to execute in UNIX pipe mode.")
def run(pipe):
import typer
from typing_extensions import Annotated
def _main(
dry_run: Annotated[bool, typer.Option()] = False,
log_level: Annotated[str, typer.Option(callback=setup_logging("DEBUG"))] = None,
pipe_mode: Annotated[
bool,
typer.Option(
"--pipe-mode",
"-p",
rich_help_panel="UNIX pipe mode",
help="Read state from standard input and write state to standard output. This is the default mode when executed in a UNIX pipe.",
),
] = False,
):
logger = logging.getLogger("elastic.pipes.core")
if sys.stdin.isatty() and not pipe_mode:
help_message(pipe)
sys.exit(1)
try:
state = receive_state_from_unix_pipe(pipe.logger, pipe.default)
pipes = get_pipes(state)
except Error as e:
fatal(e)
configs = [c for n, c in pipes if n == pipe.name]
config = configs[0] if configs else {}
with ExitStack() as stack:
try:
pipe.run(config, state, dry_run, logger, stack)
except Error as e:
pipe.logger.critical(e)
sys.exit(1)
send_state_to_unix_pipe(pipe.logger, state)
typer.run(_main)