in src/output.py [0:0]
def progress(self, futures):
"""
Print a progressbar
Args:
futures: dictionary where Keys = Name of the thread, Value = concurrent.futures object.
The function being executed MUST return a dictionary with 'status' key that defines
the status code.
"""
done = defaultdict(bool)
isatty = sys.stdout.isatty()
with reprint.output(
output_type="list", initial_len=len(futures.items()), interval=0
) as output:
num_iterations = 0
self.print_lines(output)
while True:
i = 0
num_iterations += 1
for image, thread in futures.items():
output[i] = image
if thread.done():
output[i] += (
"." * 10 + constants.STATUS_MESSAGE[futures[image].result()]
)
done[image] = True
else:
output[i] += "." * (num_iterations % 10)
done[image] = False
i += 1
if all(done.values()):
break
time.sleep(1)
self.print_lines(output)