def run_halted_queue()

in scripts/render/pipeline.py [0:0]


    def run_halted_queue(self, params, frame_chunks):
        """Runs a queue with params for each of the frame chunks. The program halts while
        awaiting the completion of tasks in the queue and shows a progress bar meanwhile. Any
        frame chunks that have been previously completed will be marked as complete unless
        running with force_recompute.

        Args:
            params (dict[str, _]): Message to be published to RabbitMQ.
            frame_chunks (list[dict[str, str]]): List of frame chunk with keys
                "first" and "last" corresponding to the appropriate frame names for the chunk.
        """
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.master_ip, heartbeat=0)
        )
        channel = connection.channel()
        channel.queue_declare(queue=config.QUEUE_NAME)
        channel.queue_declare(queue=config.RESPONSE_QUEUE_NAME)

        self.purge_queue(config.QUEUE_NAME)
        self.purge_queue(config.RESPONSE_QUEUE_NAME)

        # force_recompute can be specified over the entire pipeline or particular stages
        frame_chunks = self._get_missing_chunks(params, frame_chunks)
        if len(frame_chunks) == 0:
            return

        for frame_chunk in frame_chunks:
            params.update(frame_chunk)
            msg = json.dumps(params)
            channel.basic_publish(
                exchange="",
                routing_key=config.QUEUE_NAME,
                body=msg,
                properties=pika.BasicProperties(
                    delivery_mode=2
                ),  # make message persistent
            )

        # Waits until the queue is empty before returning for next step
        queue_state = channel.queue_declare(config.RESPONSE_QUEUE_NAME)
        queue_size = queue_state.method.message_count

        progress = "█"
        widgets = [
            f"{progress} ",
            f"{params['app']}:",
            progressbar.Bar(progress, "|", "|"),
            progressbar.Percentage(),
            " (Workers: ",
            progressbar.FormatLabel("0"),
            ") (",
            progressbar.FormatLabel("%(elapsed)s"),
            ")",
        ]
        bar = progressbar.ProgressBar(maxval=len(frame_chunks), widgets=widgets)
        bar.start()
        no_worker_period = None
        while queue_size != len(frame_chunks):
            time.sleep(1.0)
            queue_size = channel.queue_declare(
                config.RESPONSE_QUEUE_NAME
            ).method.message_count
            num_workers = channel.queue_declare(config.QUEUE_NAME).method.consumer_count
            widgets[5] = str(num_workers)

            if num_workers != 0:
                no_worker_period = None
            if num_workers == 0:
                if no_worker_period is None:
                    no_worker_period = time.time()
                if time.time() - no_worker_period > config.NO_WORKER_TIMEOUT:
                    raise Exception(
                        "No workers for extended time! Check worker logs for errors..."
                    )
            bar.update(queue_size)
        bar.finish()