in images/airflow/2.9.2/python/mwaa/celery/sqs_broker.py [0:0]
def sqs(self, queue=None):
if queue is not None and self.predefined_queues:
if queue not in self.predefined_queues:
raise UndefinedQueueException(
f"Queue with name '{queue}' must be defined"
" in 'predefined_queues'."
)
q = self.predefined_queues[queue]
if self.transport_options.get("sts_role_arn"):
return self._handle_sts_session(queue, q)
if not self.transport_options.get("sts_role_arn"):
if queue in self._predefined_queue_clients:
return self._predefined_queue_clients[queue]
else:
c = self._predefined_queue_clients[queue] = self.new_sqs_client(
region=q.get("region", self.region),
access_key_id=q.get("access_key_id", self.conninfo.userid),
secret_access_key=q.get(
"secret_access_key", self.conninfo.password
),
)
return c
if self._sqs is not None:
return self._sqs
c = self._sqs = self.new_sqs_client(
region=self.region,
access_key_id=self.conninfo.userid,
secret_access_key=self.conninfo.password,
)
return c