in dataflux_core/fast_list.py [0:0]
def run(self) -> list[tuple[str, int]]:
"""Runs the controller that manages fast listing.
Returns:
A sorted list of (str, int) tuples indicating the name and file size of each
unique file listed in the listing process.
"""
# Define the queues.
send_work_stealing_needed_queue: multiprocessing.Queue[str] = (
multiprocessing.Queue())
heartbeat_queue: multiprocessing.Queue[str] = multiprocessing.Queue()
direct_work_available_queue: multiprocessing.Queue[tuple[str, str]] = (
multiprocessing.Queue())
idle_queue: multiprocessing.Queue[str] = multiprocessing.Queue()
unidle_queue: multiprocessing.Queue[str] = multiprocessing.Queue()
results_queue: multiprocessing.Queue[set[tuple[str, int]]] = (
multiprocessing.Queue())
metadata_queue: multiprocessing.Queue[tuple[
str, int]] = multiprocessing.Queue()
error_queue: multiprocessing.Queue[Exception] = multiprocessing.Queue()
processes = []
results: set[tuple[str, int]] = set()
for i in range(self.max_parallelism):
p = multiprocessing.Process(
target=run_list_worker,
args=(
"dataflux-listing-proc." + str(i),
self.gcs_project,
self.bucket,
send_work_stealing_needed_queue,
heartbeat_queue,
direct_work_available_queue,
idle_queue,
unidle_queue,
results_queue,
metadata_queue,
error_queue,
"" if i == 0 else None,
"" if i == 0 else None,
self.retry_config,
self.client,
self.skip_compose,
self.prefix,
self.allowed_storage_classes,
),
)
processes.append(p)
p.start()
# Wait before starting the next process to avoid deadlock when multiple processes
# attempt to register with the same multiprocessing queue.
time.sleep(0.1)
while True:
time.sleep(0.2)
try:
e = error_queue.get_nowait()
logging.error(
f"Got error from child process; exiting. Check child process logs for more details. Error: {e}"
)
return self.terminate_now(processes)
except queue.Empty:
pass
alive = False
for p in processes:
if p.is_alive():
alive = True
break
new_results = set()
while True:
try:
result = results_queue.get_nowait()
new_results.update(result)
except queue.Empty:
break
if len(new_results) > 0:
results.update(new_results)
logging.debug(f"Result count: {len(results)}")
if not alive:
break
# Update all queues related to tracking process status.
self.manage_tracking_queues(idle_queue, unidle_queue,
heartbeat_queue)
if self.check_crashed_processes():
return self.terminate_now(processes)
logging.debug("Inited procs: %d", len(self.inited))
logging.debug("Waiting for work: %d", self.waiting_for_work)
if len(self.inited) == self.waiting_for_work and (
self.waiting_for_work > 0):
logging.debug("Exiting, all processes are waiting for work")
for _ in range(self.max_parallelism * 2):
direct_work_available_queue.put((None, None))
break
while True:
try:
result = results_queue.get_nowait()
results.update(result)
logging.debug(f"Result count: {len(results)}")
except queue.Empty:
break
logging.debug("Got all results, waiting for processes to exit.")
return self.cleanup_processes(processes, results_queue, metadata_queue,
results)