def _put()

in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]


    def _put(self, queue, message, **kwargs):
        """Put message onto queue."""
        q_url = self._new_queue(queue)
        if self.sqs_base64_encoding:
            body = AsyncMessage().encode(dumps(message))
        else:
            body = dumps(message)
        kwargs = {"QueueUrl": q_url, "MessageBody": body}
        if queue.endswith(".fifo"):
            if "MessageGroupId" in message["properties"]:
                kwargs["MessageGroupId"] = message["properties"]["MessageGroupId"]
            else:
                kwargs["MessageGroupId"] = "default"
            if "MessageDeduplicationId" in message["properties"]:
                kwargs["MessageDeduplicationId"] = message["properties"][
                    "MessageDeduplicationId"
                ]
            else:
                kwargs["MessageDeduplicationId"] = str(uuid.uuid4())

        c = self.sqs(queue=self.canonical_queue_name(queue))
        if message.get("redelivered"):
            # 2022-11-25: Amazon addition.
            # This branch is executed when a task is returned to the queue, e.g.
            # a worker shutdown:
            # https://github.com/celery/kombu/blob/v4.6.11/kombu/transport/virtual/base.py#L732
            Stats.incr("mwaa.celery.task_returned", 1)
            self._update_state_with_tasks(
                [
                    (
                        self._get_task_command_from_sqs_message(body),
                        message["properties"]["delivery_tag"],
                    )
                ],
                self.CeleryStateUpdateAction.REMOVE,
            )
            # End of Amazon addition
            c.change_message_visibility(
                QueueUrl=q_url,
                ReceiptHandle=message["properties"]["delivery_tag"],
                VisibilityTimeout=0,
            )
        else:
            # 2022-11-25: Amazon addition.
            # This branch is executed when the scheduler puts a task in the
            # queue so it can be picked by a Celery worker.
            Stats.incr("mwaa.celery.task_queued", 1)
            # End of Amazon addition
            c.send_message(**kwargs)