def _new_queue()

in images/airflow/2.10.1/python/mwaa/celery/sqs_broker.py [0:0]


    def _new_queue(self, queue, **kwargs):
        """Ensure a queue with given name exists in SQS."""
        if not isinstance(queue, str):
            return queue
        # Translate to SQS name for consistency with initial
        # _queue_cache population.
        queue = self.canonical_queue_name(queue)

        # The SQS ListQueues method only returns 1000 queues.  When you have
        # so many queues, it's possible that the queue you are looking for is
        # not cached.  In this case, we could update the cache with the exact
        # queue name first.
        if queue not in self._queue_cache:
            self._update_queue_cache(queue)
        try:
            return self._queue_cache[queue]
        except KeyError:
            if self.predefined_queues:
                raise UndefinedQueueException(
                    (
                        "Queue with name '{}' must be "
                        "defined in 'predefined_queues'."
                    ).format(queue)
                )

            attributes = {"VisibilityTimeout": str(self.visibility_timeout)}
            if queue.endswith(".fifo"):
                attributes["FifoQueue"] = "true"

            resp = self._create_queue(queue, attributes)
            self._queue_cache[queue] = resp["QueueUrl"]
            return resp["QueueUrl"]