in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]
def _on_messages_ready(self, queue, qname, messages):
if "Messages" in messages and messages["Messages"]:
callbacks = self.connection._callbacks
# 2022-11-25: Amazon addition.
# This code is called after messages are retrieved from SQS via
# the async client. From our perspective, this indicates tasks
# pulled from the queue, so we report a metric.
Stats.incr("mwaa.celery.task_pulled", len(messages["Messages"]))
celery_task_tuples = []
for msg in messages["Messages"]:
celery_task_tuples.append(
(
self._get_task_command_from_sqs_message(msg["Body"]),
msg["ReceiptHandle"],
)
)
msg_parsed = self._message_to_python(msg, qname, queue)
callbacks[qname](msg_parsed)
should_add_all_messages = True
if self.undead_processes_test_enabled and len(celery_task_tuples) > 1:
# In order to test for undead processes, we will not add the data regarding the first message fetched from the queue
# in 50% of the cases. This will cause the cleanup process to treat the corresponding Airflow process as undead and that
# Airflow process will be terminated.
import random
if random.randint(0, 9) < 5:
should_add_all_messages = False
if should_add_all_messages:
self._update_state_with_tasks(
celery_task_tuples, self.CeleryStateUpdateAction.ADD
)
else:
self._update_state_with_tasks(
celery_task_tuples[1:], self.CeleryStateUpdateAction.ADD
)