def __init__()

in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]


    def __init__(self, *args, **kwargs):
        if boto3 is None:
            raise ImportError("boto3 is not installed")
        super().__init__(*args, **kwargs)
        self._validate_predifined_queues()

        # SQS blows up if you try to create a new queue when one already
        # exists but with a different visibility_timeout.  This prepopulates
        # the queue_cache to protect us from recreating
        # queues that are known to already exist.
        self._update_queue_cache(self.queue_name_prefix)

        self.hub = kwargs.get("hub") or get_event_loop()

        # MWAA__CORE__TASK_MONITORING_ENABLED is set to 'true' for workers where we want to monitor count of tasks currently getting
        # executed on the worker. This will be used to determine if idle worker checks are to be enabled.
        self.idle_worker_monitoring_enabled = (
            os.environ.get("MWAA__CORE__TASK_MONITORING_ENABLED", "false") == "true"
        )
        if self.idle_worker_monitoring_enabled:
            logger.info('Idle working monitoring will be enabled because '
                        'MWAA__CORE__TASK_MONITORING_ENABLED is set to true.')

        # These are the shared memory blocks which the Worker Task Monitor and the Celery SQS Channel uses to share the internal
        # state of current work load across the two processes.
        # 'celery_state' is maintained by the SQS channel and has information about the current in-flight tasks.
        celery_state_block_name = f'celery_state_{os.environ.get("AIRFLOW_ENV_ID", "")}'
        self.celery_state = (
            shared_memory.SharedMemory(name=celery_state_block_name)
            if self.idle_worker_monitoring_enabled
            else None
        )
        # Create a shared memory block which the Worker Task Monitor and the Celery SQS Channel will use to signal the toggle of a
        # flag which tells the Celery SQS channel to pause/unpause further consumption of available SQS messages.
        # It is maintained by the worker monitor.
        celery_work_consumption_block_name = (
            f'celery_work_consumption_{os.environ.get("AIRFLOW_ENV_ID", "")}'
        )
        self.celery_work_consumption_flag_block = (
            shared_memory.SharedMemory(name=celery_work_consumption_block_name)
            if self.idle_worker_monitoring_enabled
            else None
        )
        # 'cleanup_celery_state' is maintained by the Worker Task Monitor and has information about the current in-flight tasks
        # which needs to be cleaned up from 'celery_state'. The second blob is used because worker task monitor cannot write into
        # 'celery_state'. If worker task monitor was to directly update the 'celery_state', then chances are that changes happening
        # concurrently at the worker task monitor and the SQS channel, can cause changes to be overwritten by one another.
        cleanup_celery_state_block_name = (
            f'cleanup_celery_state_{os.environ.get("AIRFLOW_ENV_ID", "")}'
        )
        self.cleanup_celery_state = (
            shared_memory.SharedMemory(name=cleanup_celery_state_block_name)
            if self.idle_worker_monitoring_enabled
            else None
        )
        self.celery_lock = Lock() if self.idle_worker_monitoring_enabled else None
        # If celery fails to remove the message from the queue but the associated airflow process has wrapped up, then the message details
        # will be stuck in the memory blocks defined above. This is the abandoned sqs messages scenario and this flag determine if we need
        # to intentionally create this scenario for testing purposes.
        self.abandoned_messages_test_enabled = (
            os.environ.get(
                "AIRFLOW__MWAA__TEST_ABANDONED_SQS_MESSAGE_SCENARIOS", "false"
            )
            == "true"
            and self.idle_worker_monitoring_enabled
        )
        # If celery removes the message from the queue but the associated airflow process has not wrapped up, then the process will continue
        # to eat worker resources. This is the undead airflow processes scenario and this flag determine if we need to intentionally create
        # this scenario for testing purposes.
        self.undead_processes_test_enabled = (
            os.environ.get("AIRFLOW__MWAA__TEST_UNDEAD_PROCESS_SCENARIOS", "false")
            == "true"
            and self.idle_worker_monitoring_enabled
        )