in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]
def _put(self, queue, message, **kwargs):
"""Put message onto queue."""
q_url = self._new_queue(queue)
if self.sqs_base64_encoding:
body = AsyncMessage().encode(dumps(message))
else:
body = dumps(message)
kwargs = {"QueueUrl": q_url, "MessageBody": body}
if queue.endswith(".fifo"):
if "MessageGroupId" in message["properties"]:
kwargs["MessageGroupId"] = message["properties"]["MessageGroupId"]
else:
kwargs["MessageGroupId"] = "default"
if "MessageDeduplicationId" in message["properties"]:
kwargs["MessageDeduplicationId"] = message["properties"][
"MessageDeduplicationId"
]
else:
kwargs["MessageDeduplicationId"] = str(uuid.uuid4())
c = self.sqs(queue=self.canonical_queue_name(queue))
if message.get("redelivered"):
# 2022-11-25: Amazon addition.
# This branch is executed when a task is returned to the queue, e.g.
# a worker shutdown:
# https://github.com/celery/kombu/blob/v4.6.11/kombu/transport/virtual/base.py#L732
Stats.incr("mwaa.celery.task_returned", 1)
self._update_state_with_tasks(
[
(
self._get_task_command_from_sqs_message(body),
message["properties"]["delivery_tag"],
)
],
self.CeleryStateUpdateAction.REMOVE,
)
# End of Amazon addition
c.change_message_visibility(
QueueUrl=q_url,
ReceiptHandle=message["properties"]["delivery_tag"],
VisibilityTimeout=0,
)
else:
# 2022-11-25: Amazon addition.
# This branch is executed when the scheduler puts a task in the
# queue so it can be picked by a Celery worker.
Stats.incr("mwaa.celery.task_queued", 1)
# End of Amazon addition
c.send_message(**kwargs)