in scripts/render/pipeline.py [0:0]
def run_halted_queue(self, params, frame_chunks):
"""Runs a queue with params for each of the frame chunks. The program halts while
awaiting the completion of tasks in the queue and shows a progress bar meanwhile. Any
frame chunks that have been previously completed will be marked as complete unless
running with force_recompute.
Args:
params (dict[str, _]): Message to be published to RabbitMQ.
frame_chunks (list[dict[str, str]]): List of frame chunk with keys
"first" and "last" corresponding to the appropriate frame names for the chunk.
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(self.master_ip, heartbeat=0)
)
channel = connection.channel()
channel.queue_declare(queue=config.QUEUE_NAME)
channel.queue_declare(queue=config.RESPONSE_QUEUE_NAME)
self.purge_queue(config.QUEUE_NAME)
self.purge_queue(config.RESPONSE_QUEUE_NAME)
# force_recompute can be specified over the entire pipeline or particular stages
frame_chunks = self._get_missing_chunks(params, frame_chunks)
if len(frame_chunks) == 0:
return
for frame_chunk in frame_chunks:
params.update(frame_chunk)
msg = json.dumps(params)
channel.basic_publish(
exchange="",
routing_key=config.QUEUE_NAME,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2
), # make message persistent
)
# Waits until the queue is empty before returning for next step
queue_state = channel.queue_declare(config.RESPONSE_QUEUE_NAME)
queue_size = queue_state.method.message_count
progress = "█"
widgets = [
f"{progress} ",
f"{params['app']}:",
progressbar.Bar(progress, "|", "|"),
progressbar.Percentage(),
" (Workers: ",
progressbar.FormatLabel("0"),
") (",
progressbar.FormatLabel("%(elapsed)s"),
")",
]
bar = progressbar.ProgressBar(maxval=len(frame_chunks), widgets=widgets)
bar.start()
no_worker_period = None
while queue_size != len(frame_chunks):
time.sleep(1.0)
queue_size = channel.queue_declare(
config.RESPONSE_QUEUE_NAME
).method.message_count
num_workers = channel.queue_declare(config.QUEUE_NAME).method.consumer_count
widgets[5] = str(num_workers)
if num_workers != 0:
no_worker_period = None
if num_workers == 0:
if no_worker_period is None:
no_worker_period = time.time()
if time.time() - no_worker_period > config.NO_WORKER_TIMEOUT:
raise Exception(
"No workers for extended time! Check worker logs for errors..."
)
bar.update(queue_size)
bar.finish()