def sqs()

in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]


    def sqs(self, queue=None):
        if queue is not None and self.predefined_queues:
            if queue not in self.predefined_queues:
                raise UndefinedQueueException(
                    f"Queue with name '{queue}' must be defined"
                    " in 'predefined_queues'."
                )
            q = self.predefined_queues[queue]
            if self.transport_options.get("sts_role_arn"):
                return self._handle_sts_session(queue, q)
            if not self.transport_options.get("sts_role_arn"):
                if queue in self._predefined_queue_clients:
                    return self._predefined_queue_clients[queue]
                else:
                    c = self._predefined_queue_clients[queue] = self.new_sqs_client(
                        region=q.get("region", self.region),
                        access_key_id=q.get("access_key_id", self.conninfo.userid),
                        secret_access_key=q.get(
                            "secret_access_key", self.conninfo.password
                        ),
                    )
                    return c

        if self._sqs is not None:
            return self._sqs

        c = self._sqs = self.new_sqs_client(
            region=self.region,
            access_key_id=self.conninfo.userid,
            secret_access_key=self.conninfo.password,
        )
        return c