def _check()

in images/airflow/2.9.2/python/mwaa/subprocess/conditions.py [0:0]


    def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse:
        """
        Execute the condition and return the response.

        :returns A ProcessConditionResponse containing data about the response.
        """

        if process_status == ProcessStatus.RUNNING:
            self.worker_task_monitor.cleanup_abandoned_resources()
            self.worker_task_monitor.process_next_signal()
            # If the allowed time limit for waiting for activation signal has been breached, then we give up on further wait and exit.
            if self.worker_task_monitor.is_activation_wait_time_limit_breached():
                return self._get_failed_condition_response("Allowed time limit for activation has been breached. Exiting")
            # If the worker is marked to be killed, then we exit the worker without waiting for the tasks to be completed.
            elif self.worker_task_monitor.is_marked_for_kill():
                return self._get_failed_condition_response("Worker has been marked for kill. Exiting.")
            # If the worker is marked to be terminated, then we exit the worker after waiting for the tasks to be completed.
            elif self.worker_task_monitor.is_marked_for_termination():
                logger.info("Worker has been marked for termination, checking for idleness before terminating.")
                if self.worker_task_monitor.is_worker_idle():
                    return self._get_failed_condition_response("Worker marked for termination has become idle. Exiting.")
                elif self.worker_task_monitor.is_termination_time_limit_breached():
                    return self._get_failed_condition_response("Allowed time limit for graceful termination has been breached. Exiting")
                else:
                    logger.info("Worker marked for termination is NOT yet idle. Waiting.")
            elif self.terminate_if_idle and self.worker_task_monitor.is_worker_idle():
                # After detecting worker idleness, we pause further work consumption via
                # Celery, wait and check again for idleness.
                logger.info("Worker process is idle and needs to be terminated. Pausing task consumption.")
                self.worker_task_monitor.pause_task_consumption()
                if self.worker_task_monitor.is_worker_idle():
                    return self._get_failed_condition_response("Worker which should be terminated if idle has been "
                                                               "found to be idle. Exiting.")
                else:
                    logger.info("Worker picked up new tasks during shutdown, reviving worker.")
                    self.worker_task_monitor.resume_task_consumption()
                    self.worker_task_monitor.reset_monitor_state()
            else:
                logger.info("Worker process is either NOT idle or has not been marked for termination. No action is needed.")
        else:
            logger.info(
                f"Worker process finished (status is {process_status.name}). "
                "No need to monitor tasks anymore."
            )

        self._publish_metrics()

        # For all other scenarios, the condition defaults to returning success.
        return ProcessConditionResponse(
            condition=self,
            successful=True,
        )