in images/airflow/2.10.1/python/mwaa/celery/sqs_broker.py [0:0]
def _message_to_python(self, message, queue_name, queue):
body = self._optional_b64_decode(message["Body"].encode())
payload = loads(bytes_to_str(body))
if queue_name in self._noack_queues:
queue = self._new_queue(queue_name)
# 2022-11-25: Amazon addition.
# This branch won't be called with our current configuration for
# Airflow/Celery. It will be called only when task_acks_late Celery
# configuration is set to False, resulting in task messages being
# deleted from the SQS immediately, rather than waiting for the
# worker to finish the task. However, we still report a metric to
# make sure the code is future-proof, e.g. if Airflow or MWAA decide
# to disable the task_acks_late configuration.
Stats.incr("mwaa.celery.task_pulled", 1)
self._update_state_with_tasks(
[
(
self._get_task_command_from_sqs_message(message["Body"]),
message["ReceiptHandle"],
)
],
self.CeleryStateUpdateAction.ADD,
)
# End of Amazon addition.
self.asynsqs(queue=queue_name).delete_message(
queue,
message["ReceiptHandle"],
)
else:
try:
properties = payload["properties"]
delivery_info = payload["properties"]["delivery_info"]
except KeyError:
# json message not sent by kombu?
delivery_info = {}
properties = {"delivery_info": delivery_info}
payload.update(
{
"body": bytes_to_str(body),
"properties": properties,
}
)
# set delivery tag to SQS receipt handle
delivery_info.update(
{
"sqs_message": message,
"sqs_queue": queue,
}
)
properties["delivery_tag"] = message["ReceiptHandle"]
return payload