in labgraph/runners/process_manager.py [0:0]
def _terminate_gracefully(self) -> None:
"""
Terminates the managed processes gratefully.
"""
# Short circuit if already terminated
if self._state.get(self._name) == ProcessPhase.TERMINATED:
return
logger.debug(f"{self._name}:terminating gracefully")
with self._state.lock:
# Set the manager's state to STOPPING. This is the signal that managed
# processes must use - they are expected to then stop and terminate
# themselves. This makes for a graceful termination.
if self._state.get(self._name) != ProcessPhase.STOPPING:
self._state.update(self._name, ProcessPhase.STOPPING)
terminate_start = time.perf_counter()
# Wait for all processes to terminate
while True:
if len(self._get_dead_processes()) == len(self._processes):
with self._state.lock:
self._state.update(self._name, ProcessPhase.TERMINATED)
# Check if any processes crashed while stopping
if self._state.get_exception(self._name) is None:
self._check_crashed_processes()
logger.debug(f"{self._name}:terminated gracefully")
return
# Check if the termination has exceeded the shutdown period
current_time = time.perf_counter()
if current_time - terminate_start >= self._shutdown_period:
break
time.sleep(MONITOR_SLEEP_TIME)
for process_name in set(self._processes.keys()).difference(
self._get_dead_processes()
):
self._hanged_processes.add(process_name)
logger.warning(f"{self._name}:graceful termination timed out")
# No more Mr. Nice Guy; terminate the processes forcibly
self._terminate_forcibly()