in images/airflow/2.10.3/python/mwaa/celery/task_monitor.py [0:0]
def close(self):
"""
This will be called when the worker has been terminated. We close these blocks
in order to avoid receiving a warning message in the logs. We are not calling
unlink here intentionally because both the task monitor and the Celery SQS
channel references these blocks and when the worker is shutting down, we do not
control which process will be killed first. Calling unlink may result in a
warning message showing up in the customer side logs causing unnecessary
confusion.
"""
if self.closed:
# Already closed.
return
logger.info("Closing task monitor...")
self.pause_task_consumption()
# Report a metric about whether the worker was never activated before its activation time limit was breached.
activation_timeout_metric = 1 if self.is_activation_wait_time_limit_breached() else 0
self.stats.incr("mwaa.task_monitor.worker_shutdown_activation_timeout", activation_timeout_metric)
# Report a metric about whether the worker was not able to finish all its tasks before its termination time limit was breached.
termination_timeout_metric = 1 if self.is_termination_time_limit_breached() else 0
self.stats.incr("mwaa.task_monitor.worker_shutdown_termination_timeout", termination_timeout_metric)
# Report a metric about the number of current task at shutdown, and a warning in case this is greater than zero.
interrupted_task_count = self.get_current_task_count()
if interrupted_task_count > 0:
logger.warning("There are non-zero ongoing tasks.")
self.stats.incr(f"mwaa.task_monitor.interrupted_tasks_at_shutdown", interrupted_task_count)
if self.mwaa_signal_handling_enabled:
# If the worker was marked for killing or was marked for termination and the allowed time limit for termination
# has been breached, then these interruptions are expected and another metric is also emitted to signify that.
unexpected_interrupted_task_count = 0 \
if self.marked_for_kill or self.is_termination_time_limit_breached() else interrupted_task_count
if unexpected_interrupted_task_count > 0:
logger.warning("Worker was not shutdown via expected methods and some tasks were interrupted.")
self.stats.incr(f"mwaa.task_monitor.unexpected_interrupted_tasks_at_shutdown", unexpected_interrupted_task_count)
# Close shared memory objects.
self.celery_state.close()
self.celery_work_consumption_block.close()
self.cleanup_celery_state.close()
self.closed = True
logger.info("Task monitor closed.")