pipeline/common/command_runner.py (54 lines of code) (raw):
import os
import re
from shlex import join
import shlex
import subprocess
def _get_indented_command_string(command_parts: list[str]) -> str:
"""
Print out a command with the flags indented, so that it's easy to read.
"""
command = join(command_parts)
parts = re.split(r"( --\w)", command)
formatted_command = [parts[0].strip()]
for i in range(1, len(parts), 2):
option = parts[i].strip() + parts[i + 1].strip()
formatted_command.append(f" {option}")
return "\n".join(formatted_command)
def apply_command_args(dict: dict[str, any]):
"""
Takes in a dictionary, and applies the keys as command line flags.
input: { "key": "value" }
output: "--key value"
input: { "inputs": ["valueA", "valueB"] }
output: "--inputs valueA valueB"
"""
for key, value in dict.items():
yield f"--{key}"
if value is None:
continue
if isinstance(value, (list, tuple)):
for v in value:
yield str(v)
continue
yield str(value)
def run_command_pipeline(
commands: list[list[str]], pipe_stderr=False, capture=False, logger=None
) -> str | None:
"""
Executes a series of shell commands in a pipeline, where the output of one command
is piped to the next. Optionally captures the final output or logs the pipeline
process. It raises `CalledProcessError` if any command in the pipeline fails.
Args:
commands: A list of command arguments where each command is
represented as a list of strings.
pipe_stderr: If True, pipes `stderr` of each command into `stdout`.
capture: If True, captures and returns the output of the final command in the
pipeline. If False, output is printed to stdout. Defaults to False.
logger: A logger instance used for logging the command execution. If provided,
it will log the constructed pipeline commands. Defaults to None.
Example:
python_scripts = run_command_pipeline(
[
["ls", "-l"],
["grep", ".py"],
["sort"]
],
capture=True
)
"""
if pipe_stderr:
joiner = "2>&1 |"
else:
joiner = "|"
if logger:
# Log out a nice representation of this command.
final_command = _get_indented_command_string(commands[0])
for command_parts in commands[1:]:
final_command = (
f"{final_command}\n{joiner} {_get_indented_command_string(command_parts)}"
)
logger.info("Running:")
for line in final_command.split("\n"):
logger.info(line)
command_string = f" {joiner} ".join([shlex.join(command) for command in commands])
if capture:
return subprocess.check_output(command_string, shell=True).decode("utf-8")
subprocess.check_call(command_string, shell=True)
def run_command(
command: list[str], capture=False, shell=False, logger=None, env=None
) -> str | None:
"""
Runs a command and outputs a nice representation of the command to a logger, if supplied.
Args:
command: The command arguments provided to subprocess.check_call
capture: If True, captures and returns the output of the final command in the
pipeline. If False, output is printed to stdout.
logger: A logger instance used for logging the command execution. If provided,
it will log the pipeline commands.
env: The environment object.
Example:
directory_listing = run_command(
["ls", "-l"],
capture=True
)
"""
# Expand any environment variables.
command = [os.path.expandvars(part) for part in command]
if logger:
# Log out a nice representation of this command.
logger.info("Running:")
for line in _get_indented_command_string(command).split("\n"):
logger.info(line)
if capture:
return subprocess.check_output(command, env=env).decode("utf-8")
subprocess.check_call(command, env=env)