core/import.py (61 lines of code) (raw):
# Copyright 2025 Elasticsearch B.V.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Elastic Pipes component to import data into the Pipes state."""
import sys
from logging import Logger
from pathlib import Path
from typing_extensions import Annotated, Any
from . import Pipe
from .util import deserialize, fatal, warn_interactive
class Ctx(Pipe.Context):
base_dir: Annotated[
str,
Pipe.State("runtime.base-dir"),
] = str(Path.cwd())
file_name: Annotated[
str,
Pipe.Config("file"),
Pipe.Help("file containing the source data"),
Pipe.Notes("default: standard input"),
] = None
format: Annotated[
str,
Pipe.Config("format"),
Pipe.Help("data format of the file content (ex. yaml, json, ndjson)"),
Pipe.Notes("default: guessed from the file name extension"),
] = None
state: Annotated[
Any,
Pipe.State(None, indirect="node", mutable=True),
Pipe.Help("state node destination of the data"),
Pipe.Notes("default: whole state"),
]
interactive: Annotated[
bool,
Pipe.Config("interactive"),
Pipe.Help("allow importing data from the terminal"),
] = False
@Pipe("elastic.pipes.core.import")
def main(ctx: Ctx, log: Logger, dry_run: bool):
"""Import data from file or standard input."""
format = ctx.format
if format is None:
if ctx.file_name:
format = Path(ctx.file_name).suffix.lower()[1:]
log.debug(f"import file format guessed from file extension: {format}")
else:
format = "yaml"
log.debug(f"assuming import file format: {format}")
if not ctx.file_name and sys.stdin.isatty() and not ctx.interactive:
fatal("To use `elastic.pipes.core.import` interactively, set `interactive: true` in its configuration.")
if dry_run:
return
node = ctx.get_binding("state").node
msg_state = "everything" if node is None else f"'{node}'"
msg_file_name = f"'{ctx.file_name}'" if ctx.file_name else "standard input"
log.info(f"importing {msg_state} from {msg_file_name}...")
if ctx.file_name:
with open(Path(ctx.base_dir) / ctx.file_name, "r") as f:
warn_interactive(f)
ctx.state = deserialize(f, format=format) or {}
else:
warn_interactive(sys.stdin)
ctx.state = deserialize(sys.stdin, format=format) or {}
if __name__ == "__main__":
main()