def _message_to_python()

in images/airflow/2.9.2/python/mwaa/celery/sqs_broker.py [0:0]


    def _message_to_python(self, message, queue_name, queue):
        body = self._optional_b64_decode(message["Body"].encode())
        payload = loads(bytes_to_str(body))
        if queue_name in self._noack_queues:
            queue = self._new_queue(queue_name)
            # 2022-11-25: Amazon addition.
            # This branch won't be called with our current configuration for
            # Airflow/Celery. It will be called only when task_acks_late Celery
            # configuration is set to False, resulting in task messages being
            # deleted from the SQS immediately, rather than waiting for the
            # worker to finish the task. However, we still report a metric to
            # make sure the code is future-proof, e.g. if Airflow or MWAA decide
            # to disable the task_acks_late configuration.
            Stats.incr("mwaa.celery.task_pulled", 1)
            self._update_state_with_tasks(
                [
                    (
                        self._get_task_command_from_sqs_message(message["Body"]),
                        message["ReceiptHandle"],
                    )
                ],
                self.CeleryStateUpdateAction.ADD,
            )
            # End of Amazon addition.
            self.asynsqs(queue=queue_name).delete_message(
                queue,
                message["ReceiptHandle"],
            )
        else:
            try:
                properties = payload["properties"]
                delivery_info = payload["properties"]["delivery_info"]
            except KeyError:
                # json message not sent by kombu?
                delivery_info = {}
                properties = {"delivery_info": delivery_info}
                payload.update(
                    {
                        "body": bytes_to_str(body),
                        "properties": properties,
                    }
                )
            # set delivery tag to SQS receipt handle
            delivery_info.update(
                {
                    "sqs_message": message,
                    "sqs_queue": queue,
                }
            )
            properties["delivery_tag"] = message["ReceiptHandle"]
        return payload