in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]
def __init__(self, *args, **kwargs):
if boto3 is None:
raise ImportError("boto3 is not installed")
super().__init__(*args, **kwargs)
self._validate_predifined_queues()
# SQS blows up if you try to create a new queue when one already
# exists but with a different visibility_timeout. This prepopulates
# the queue_cache to protect us from recreating
# queues that are known to already exist.
self._update_queue_cache(self.queue_name_prefix)
self.hub = kwargs.get("hub") or get_event_loop()
# MWAA__CORE__TASK_MONITORING_ENABLED is set to 'true' for workers where we want to monitor count of tasks currently getting
# executed on the worker. This will be used to determine if idle worker checks are to be enabled.
self.idle_worker_monitoring_enabled = (
os.environ.get("MWAA__CORE__TASK_MONITORING_ENABLED", "false") == "true"
)
if self.idle_worker_monitoring_enabled:
logger.info('Idle working monitoring will be enabled because '
'MWAA__CORE__TASK_MONITORING_ENABLED is set to true.')
# These are the shared memory blocks which the Worker Task Monitor and the Celery SQS Channel uses to share the internal
# state of current work load across the two processes.
# 'celery_state' is maintained by the SQS channel and has information about the current in-flight tasks.
celery_state_block_name = f'celery_state_{os.environ.get("AIRFLOW_ENV_ID", "")}'
self.celery_state = (
shared_memory.SharedMemory(name=celery_state_block_name)
if self.idle_worker_monitoring_enabled
else None
)
# Create a shared memory block which the Worker Task Monitor and the Celery SQS Channel will use to signal the toggle of a
# flag which tells the Celery SQS channel to pause/unpause further consumption of available SQS messages.
# It is maintained by the worker monitor.
celery_work_consumption_block_name = (
f'celery_work_consumption_{os.environ.get("AIRFLOW_ENV_ID", "")}'
)
self.celery_work_consumption_flag_block = (
shared_memory.SharedMemory(name=celery_work_consumption_block_name)
if self.idle_worker_monitoring_enabled
else None
)
# 'cleanup_celery_state' is maintained by the Worker Task Monitor and has information about the current in-flight tasks
# which needs to be cleaned up from 'celery_state'. The second blob is used because worker task monitor cannot write into
# 'celery_state'. If worker task monitor was to directly update the 'celery_state', then chances are that changes happening
# concurrently at the worker task monitor and the SQS channel, can cause changes to be overwritten by one another.
cleanup_celery_state_block_name = (
f'cleanup_celery_state_{os.environ.get("AIRFLOW_ENV_ID", "")}'
)
self.cleanup_celery_state = (
shared_memory.SharedMemory(name=cleanup_celery_state_block_name)
if self.idle_worker_monitoring_enabled
else None
)
self.celery_lock = Lock() if self.idle_worker_monitoring_enabled else None
# If celery fails to remove the message from the queue but the associated airflow process has wrapped up, then the message details
# will be stuck in the memory blocks defined above. This is the abandoned sqs messages scenario and this flag determine if we need
# to intentionally create this scenario for testing purposes.
self.abandoned_messages_test_enabled = (
os.environ.get(
"AIRFLOW__MWAA__TEST_ABANDONED_SQS_MESSAGE_SCENARIOS", "false"
)
== "true"
and self.idle_worker_monitoring_enabled
)
# If celery removes the message from the queue but the associated airflow process has not wrapped up, then the process will continue
# to eat worker resources. This is the undead airflow processes scenario and this flag determine if we need to intentionally create
# this scenario for testing purposes.
self.undead_processes_test_enabled = (
os.environ.get("AIRFLOW__MWAA__TEST_UNDEAD_PROCESS_SCENARIOS", "false")
== "true"
and self.idle_worker_monitoring_enabled
)