in dags/map_reproducibility/utils/sample_workload_utils.py [0:0]
def execute_workload_commands(commands: list, cwd: str) -> Tuple[bool, list]:
"""Execute shell commands and capture their outputs.
Args:
commands: List of shell commands to execute
cwd: Current working directory
Returns:
Tuple of (success, list of command results)
"""
logger.info(f"Executing commands: {commands} in directory: {cwd}")
# Join commands with semicolons for sequential execution
combined_command = ";".join(commands)
# Run the combined command
process = subprocess.Popen(
["bash", "-c", combined_command],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=cwd,
)
# Capture output
stdout, stderr = process.communicate()
exit_code = process.returncode
# Create result for the combined execution
command_results = [{
"command": combined_command,
"stdout": stdout,
"stderr": stderr,
"output": stdout + ("\n\nSTDERR:\n" + stderr if stderr else ""),
"exit_code": exit_code,
}]
# Log results
if stdout:
logger.info(f"Stdout for combined commands:\n{stdout}")
if stderr:
logger.warning(f"Stderr for combined commands:\n{stderr}")
if exit_code != 0:
logger.error("Command execution failed")
return exit_code == 0, command_results