in images/airflow/2.9.2/python/mwaa/subprocess/conditions.py [0:0]
def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse:
"""
Execute the condition and return the response.
:returns A ProcessConditionResponse containing data about the response.
"""
if process_status == ProcessStatus.RUNNING:
self.worker_task_monitor.cleanup_abandoned_resources()
self.worker_task_monitor.process_next_signal()
# If the allowed time limit for waiting for activation signal has been breached, then we give up on further wait and exit.
if self.worker_task_monitor.is_activation_wait_time_limit_breached():
return self._get_failed_condition_response("Allowed time limit for activation has been breached. Exiting")
# If the worker is marked to be killed, then we exit the worker without waiting for the tasks to be completed.
elif self.worker_task_monitor.is_marked_for_kill():
return self._get_failed_condition_response("Worker has been marked for kill. Exiting.")
# If the worker is marked to be terminated, then we exit the worker after waiting for the tasks to be completed.
elif self.worker_task_monitor.is_marked_for_termination():
logger.info("Worker has been marked for termination, checking for idleness before terminating.")
if self.worker_task_monitor.is_worker_idle():
return self._get_failed_condition_response("Worker marked for termination has become idle. Exiting.")
elif self.worker_task_monitor.is_termination_time_limit_breached():
return self._get_failed_condition_response("Allowed time limit for graceful termination has been breached. Exiting")
else:
logger.info("Worker marked for termination is NOT yet idle. Waiting.")
elif self.terminate_if_idle and self.worker_task_monitor.is_worker_idle():
# After detecting worker idleness, we pause further work consumption via
# Celery, wait and check again for idleness.
logger.info("Worker process is idle and needs to be terminated. Pausing task consumption.")
self.worker_task_monitor.pause_task_consumption()
if self.worker_task_monitor.is_worker_idle():
return self._get_failed_condition_response("Worker which should be terminated if idle has been "
"found to be idle. Exiting.")
else:
logger.info("Worker picked up new tasks during shutdown, reviving worker.")
self.worker_task_monitor.resume_task_consumption()
self.worker_task_monitor.reset_monitor_state()
else:
logger.info("Worker process is either NOT idle or has not been marked for termination. No action is needed.")
else:
logger.info(
f"Worker process finished (status is {process_status.name}). "
"No need to monitor tasks anymore."
)
self._publish_metrics()
# For all other scenarios, the condition defaults to returning success.
return ProcessConditionResponse(
condition=self,
successful=True,
)