in labgraph/runners/process_manager.py [0:0]
def _wait_for_startup_phase(self, target_phase: ProcessPhase) -> None:
"""
Waits for all processes to enter the same phase during startup. Kills all
processes if they crash, raise, or hang.
"""
if self._state.get(self._name).value >= target_phase.value:
self._terminate_gracefully()
return
should_terminate = False # Flag that can be set to kill the manager
logger.debug(
f"{self._name}:waiting for all processes to be {target_phase.name}"
)
while True:
with self._state.lock:
# Check if any process has crashed
self._check_crashed_processes()
if len(self._crashed_processes) > 0:
should_terminate = True
# Check if any process raised an exception
if self._state.has_exception:
should_terminate = True
phases = self._state.get_all()
# Stop waiting if all the processes have entered the right state
if all(
phase.value >= target_phase.value
for name, phase in phases.items()
if name != self._name
):
overall_phase = ProcessPhase(
min(
phase.value
for name, phase in phases.items()
if name != self._name
)
)
if self._state.get_overall() != overall_phase:
self._state.update(self._name, overall_phase)
break
# Check if any processes took too long to enter the state
slow_processes = {
process_name: phase
for process_name, phase in phases.items()
if phase.value < target_phase.value and process_name != self._name
}
current_time = time.perf_counter()
assert self._start_time is not None
if (
current_time - self._start_time >= self._startup_period
and len(slow_processes) > 0
):
error = (
f"{self._name}:modules took too long to be {target_phase.name}:\n"
)
for name, phase in slow_processes.items():
error += f"- {name}: {phase.name}\n"
self._hanged_processes.add(name)
logger.error(error)
should_terminate = True
if should_terminate:
break
time.sleep(MONITOR_SLEEP_TIME)
# Terminate if the termination flag was set
if should_terminate:
self._terminate_gracefully()
return
logger.debug(f"{self._name}:processes are all {target_phase.name}")