def run_subprocesses()

in images/airflow/2.10.1/python/mwaa/subprocess/subprocess.py [0:0]


def run_subprocesses(subprocesses: List[Subprocess]):
    """
    Run the given subprocesses in parallel.

    This utility function is useful if you want to have multiple processes running in
    parallel and you want to make sure that you ingest logs from all of them in
    parallel. This works by calling the start() method of each subprocess with a False
    value for the auto_enter_execution_loop parameter. This will result into starting
    the process but not monitoring its logs. The caller would then need to manually call
    the loop() method to ingest logs, which is what we do here for all processes.
    When a subprocess marked as essential exits, all other subprocesses will be shutdown.

    :param subprocesses: A list of Subprocess objects to run in parallel.
    """
    for s in subprocesses:
        s.start(False)  # False since we want to run the subprocesses in parallel
        s.start_log_capture()
    running_processes = subprocesses
    while len(running_processes) > 0:
        start_time = time.time()
        finished_processes: List[Subprocess] = []
        for s in running_processes:
            if not s.execution_loop_iter():
                finished_processes.append(s)

        # Remove finished processes from the list of running processes.
        running_processes = [s for s in running_processes if s not in finished_processes]

        finished_essential_processes = [
            s for s in finished_processes if s.is_essential
        ]

        if finished_essential_processes:
            names = [str(p) for p in finished_essential_processes]
            module_logger.warning(
                f"The following essential process(es) exited: {', '.join(names)}. "
                "Terminating other subprocesses..."
            )
            for s in running_processes:
                s.shutdown()
            break
        dt = time.time() - start_time
        time.sleep(max(1.0 - dt, 0))
    for s in subprocesses:
        s.finish_log_capture()