in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]
def _new_queue(self, queue, **kwargs):
"""Ensure a queue with given name exists in SQS."""
if not isinstance(queue, str):
return queue
# Translate to SQS name for consistency with initial
# _queue_cache population.
queue = self.canonical_queue_name(queue)
# The SQS ListQueues method only returns 1000 queues. When you have
# so many queues, it's possible that the queue you are looking for is
# not cached. In this case, we could update the cache with the exact
# queue name first.
if queue not in self._queue_cache:
self._update_queue_cache(queue)
try:
return self._queue_cache[queue]
except KeyError:
if self.predefined_queues:
raise UndefinedQueueException(
(
"Queue with name '{}' must be "
"defined in 'predefined_queues'."
).format(queue)
)
attributes = {"VisibilityTimeout": str(self.visibility_timeout)}
if queue.endswith(".fifo"):
attributes["FifoQueue"] = "true"
resp = self._create_queue(queue, attributes)
self._queue_cache[queue] = resp["QueueUrl"]
return resp["QueueUrl"]