in images/airflow/2.9.2/python/mwaa/subprocess/subprocess.py [0:0]
def run_subprocesses(subprocesses: List[Subprocess]):
"""
Run the given subprocesses in parallel.
This utility function is useful if you want to have multiple processes running in
parallel and you want to make sure that you ingest logs from all of them in
parallel. This works by calling the start() method of each subprocess with a False
value for the auto_enter_execution_loop parameter. This will result into starting
the process but not monitoring its logs. The caller would then need to manually call
the loop() method to ingest logs, which is what we do here for all processes.
When a subprocess marked as essential exits, all other subprocesses will be shutdown.
:param subprocesses: A list of Subprocess objects to run in parallel.
"""
for s in subprocesses:
s.start(False) # False since we want to run the subprocesses in parallel
s.start_log_capture()
running_processes = subprocesses
while len(running_processes) > 0:
start_time = time.time()
finished_processes: List[Subprocess] = []
for s in running_processes:
if not s.execution_loop_iter():
finished_processes.append(s)
# Remove finished processes from the list of running processes.
running_processes = [s for s in running_processes if s not in finished_processes]
finished_essential_processes = [
s for s in finished_processes if s.is_essential
]
if finished_essential_processes:
names = [str(p) for p in finished_essential_processes]
module_logger.warning(
f"The following essential process(es) exited: {', '.join(names)}. "
"Terminating other subprocesses..."
)
for s in running_processes:
s.shutdown()
break
dt = time.time() - start_time
time.sleep(max(1.0 - dt, 0))
for s in subprocesses:
s.finish_log_capture()