def _on_messages_ready()

in images/airflow/2.10.1/python/mwaa/celery/sqs_broker.py [0:0]


    def _on_messages_ready(self, queue, qname, messages):
        if "Messages" in messages and messages["Messages"]:
            callbacks = self.connection._callbacks

            # 2022-11-25: Amazon addition.
            # This code is called after messages are retrieved from SQS via
            # the async client. From our perspective, this indicates tasks
            # pulled from the queue, so we report a metric.
            Stats.incr("mwaa.celery.task_pulled", len(messages["Messages"]))
            celery_task_tuples = []

            for msg in messages["Messages"]:
                celery_task_tuples.append(
                    (
                        self._get_task_command_from_sqs_message(msg["Body"]),
                        msg["ReceiptHandle"],
                    )
                )
                msg_parsed = self._message_to_python(msg, qname, queue)
                callbacks[qname](msg_parsed)

            should_add_all_messages = True
            if self.undead_processes_test_enabled and len(celery_task_tuples) > 1:
                # In order to test for undead processes, we will not add the data regarding the first message fetched from the queue
                # in 50% of the cases. This will cause the cleanup process to treat the corresponding Airflow process as undead and that
                # Airflow process will be terminated.
                import random

                if random.randint(0, 9) < 5:
                    should_add_all_messages = False

            if should_add_all_messages:
                self._update_state_with_tasks(
                    celery_task_tuples, self.CeleryStateUpdateAction.ADD
                )
            else:
                self._update_state_with_tasks(
                    celery_task_tuples[1:], self.CeleryStateUpdateAction.ADD
                )