def close()

in images/airflow/2.10.3/python/mwaa/celery/task_monitor.py [0:0]


    def close(self):
        """
        This will be called when the worker has been terminated. We close these blocks
        in order to avoid receiving a warning message in the logs. We are not calling
        unlink here intentionally because both the task monitor and the Celery SQS
        channel references these blocks and when the worker is shutting down, we do not
        control which process will be killed first. Calling unlink may result in a
        warning message showing up in the customer side logs causing unnecessary
        confusion.
        """
        if self.closed:
            # Already closed.
            return

        logger.info("Closing task monitor...")
        self.pause_task_consumption()

        # Report a metric about whether the worker was never activated before its activation time limit was breached.
        activation_timeout_metric = 1 if self.is_activation_wait_time_limit_breached() else 0
        self.stats.incr("mwaa.task_monitor.worker_shutdown_activation_timeout", activation_timeout_metric)

        # Report a metric about whether the worker was not able to finish all its tasks before its termination time limit was breached.
        termination_timeout_metric = 1 if self.is_termination_time_limit_breached() else 0
        self.stats.incr("mwaa.task_monitor.worker_shutdown_termination_timeout", termination_timeout_metric)

        # Report a metric about the number of current task at shutdown, and a warning in case this is greater than zero.
        interrupted_task_count = self.get_current_task_count()
        if interrupted_task_count > 0:
            logger.warning("There are non-zero ongoing tasks.")
        self.stats.incr(f"mwaa.task_monitor.interrupted_tasks_at_shutdown", interrupted_task_count)

        if self.mwaa_signal_handling_enabled:
            # If the worker was marked for killing or was marked for termination and the allowed time limit for termination
            # has been breached, then these interruptions are expected and another metric is also emitted to signify that.
            unexpected_interrupted_task_count = 0 \
                if self.marked_for_kill or self.is_termination_time_limit_breached() else interrupted_task_count
            if unexpected_interrupted_task_count > 0:
                logger.warning("Worker was not shutdown via expected methods and some tasks were interrupted.")
            self.stats.incr(f"mwaa.task_monitor.unexpected_interrupted_tasks_at_shutdown", unexpected_interrupted_task_count)

        # Close shared memory objects.
        self.celery_state.close()
        self.celery_work_consumption_block.close()
        self.cleanup_celery_state.close()

        self.closed = True

        logger.info("Task monitor closed.")