core/export.py (52 lines of code) (raw):

# Copyright 2025 Elasticsearch B.V. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Elastic Pipes component to export data from the Pipes state.""" import sys from logging import Logger from pathlib import Path from typing_extensions import Annotated, Any from . import Pipe from .util import serialize class Ctx(Pipe.Context): base_dir: Annotated[ str, Pipe.State("runtime.base-dir"), ] = str(Path.cwd()) file_name: Annotated[ str, Pipe.Config("file"), Pipe.Help("file destination of the data"), Pipe.Notes("default: standard output"), ] = None format: Annotated[ str, Pipe.Config("format"), Pipe.Help("data format of the file content (ex. yaml, json, ndjson)"), Pipe.Notes("default: guessed from the file name extension"), ] = None state: Annotated[ Any, Pipe.State(None, indirect="node", mutable=True), Pipe.Help("state node containing the source data"), Pipe.Notes("default: whole state"), ] @Pipe("elastic.pipes.core.export") def main(ctx: Ctx, log: Logger, dry_run: bool): """Export data to file or standard output.""" format = ctx.format if format is None: if ctx.file_name: format = Path(ctx.file_name).suffix.lower()[1:] log.debug(f"export file format guessed from file extension: {format}") else: format = "yaml" log.debug(f"assuming export file format: {format}") if dry_run: return node = ctx.get_binding("state").node msg_state = "everything" if node is None else f"'{node}'" msg_file_name = f"'{ctx.file_name}'" if ctx.file_name else "standard output" log.info(f"exporting {msg_state} to {msg_file_name}...") if ctx.file_name: with open(Path(ctx.base_dir) / ctx.file_name, "w") as f: serialize(f, ctx.state, format=format) else: serialize(sys.stdout, ctx.state, format=format) if __name__ == "__main__": main()