in images/airflow/2.9.2/python/mwaa/celery/sqs_broker.py [0:0]
def asynsqs(self, queue=None):
if queue is not None and self.predefined_queues:
if queue in self._predefined_queue_async_clients and not hasattr(
self, "sts_expiration"
):
return self._predefined_queue_async_clients[queue]
if queue not in self.predefined_queues:
raise UndefinedQueueException(
(
"Queue with name '{}' must be defined in "
"'predefined_queues'."
).format(queue)
)
q = self.predefined_queues[queue]
c = self._predefined_queue_async_clients[queue] = AsyncSQSConnection(
sqs_connection=self.sqs(queue=queue),
region=q.get("region", self.region),
)
return c
if self._asynsqs is not None:
return self._asynsqs
c = self._asynsqs = AsyncSQSConnection(
sqs_connection=self.sqs(queue=queue), region=self.region
)
return c