hcp/vault/write.py (37 lines of code) (raw):

#!/usr/bin/env python3 import sys from logging import Logger import hvac from elastic.pipes.core import Pipe from typing_extensions import Annotated from .common import Context @Pipe("elastic.pipes.hcp.vault.write") def main( log: Logger, ctx: Context, path: Annotated[ str, Pipe.Config("path"), Pipe.Help("Vault path destination of the data"), ], vault: Annotated[ dict, Pipe.State("vault", mutable=True), Pipe.Help("state node containing the source data"), ], ): """Write data to an HCP Vault instance.""" log.info(f"connect to '{ctx.url}'") vc = hvac.Client(url=ctx.url, token=ctx.token) try: if not vc.is_authenticated(): log.error("Vault could not authenticate") sys.exit(1) except Exception as e: log.error(e) sys.exit(1) log.info(f"write to path '{path}'") res = vc.write_data(path, data=vault) if res is None: log.error(f"could not write path: '{path}'") sys.exit(1) if __name__ == "__main__": main()