def run()

in dataflux_core/fast_list.py [0:0]


    def run(self) -> list[tuple[str, int]]:
        """Runs the controller that manages fast listing.

        Returns:
          A sorted list of (str, int) tuples indicating the name and file size of each
          unique file listed in the listing process.
        """
        # Define the queues.
        send_work_stealing_needed_queue: multiprocessing.Queue[str] = (
            multiprocessing.Queue())
        heartbeat_queue: multiprocessing.Queue[str] = multiprocessing.Queue()
        direct_work_available_queue: multiprocessing.Queue[tuple[str, str]] = (
            multiprocessing.Queue())
        idle_queue: multiprocessing.Queue[str] = multiprocessing.Queue()
        unidle_queue: multiprocessing.Queue[str] = multiprocessing.Queue()
        results_queue: multiprocessing.Queue[set[tuple[str, int]]] = (
            multiprocessing.Queue())
        metadata_queue: multiprocessing.Queue[tuple[
            str, int]] = multiprocessing.Queue()
        error_queue: multiprocessing.Queue[Exception] = multiprocessing.Queue()
        processes = []
        results: set[tuple[str, int]] = set()
        for i in range(self.max_parallelism):
            p = multiprocessing.Process(
                target=run_list_worker,
                args=(
                    "dataflux-listing-proc." + str(i),
                    self.gcs_project,
                    self.bucket,
                    send_work_stealing_needed_queue,
                    heartbeat_queue,
                    direct_work_available_queue,
                    idle_queue,
                    unidle_queue,
                    results_queue,
                    metadata_queue,
                    error_queue,
                    "" if i == 0 else None,
                    "" if i == 0 else None,
                    self.retry_config,
                    self.client,
                    self.skip_compose,
                    self.prefix,
                    self.allowed_storage_classes,
                ),
            )
            processes.append(p)
            p.start()
            # Wait before starting the next process to avoid deadlock when multiple processes
            # attempt to register with the same multiprocessing queue.
            time.sleep(0.1)
        while True:
            time.sleep(0.2)
            try:
                e = error_queue.get_nowait()
                logging.error(
                    f"Got error from child process; exiting. Check child process logs for more details. Error: {e}"
                )
                return self.terminate_now(processes)
            except queue.Empty:
                pass
            alive = False
            for p in processes:
                if p.is_alive():
                    alive = True
                    break
            new_results = set()
            while True:
                try:
                    result = results_queue.get_nowait()
                    new_results.update(result)
                except queue.Empty:
                    break
            if len(new_results) > 0:
                results.update(new_results)
                logging.debug(f"Result count: {len(results)}")
            if not alive:
                break
            # Update all queues related to tracking process status.
            self.manage_tracking_queues(idle_queue, unidle_queue,
                                        heartbeat_queue)
            if self.check_crashed_processes():
                return self.terminate_now(processes)
            logging.debug("Inited procs: %d", len(self.inited))
            logging.debug("Waiting for work: %d", self.waiting_for_work)
            if len(self.inited) == self.waiting_for_work and (
                    self.waiting_for_work > 0):
                logging.debug("Exiting, all processes are waiting for work")
                for _ in range(self.max_parallelism * 2):
                    direct_work_available_queue.put((None, None))
                break
        while True:
            try:
                result = results_queue.get_nowait()
                results.update(result)
                logging.debug(f"Result count: {len(results)}")
            except queue.Empty:
                break
        logging.debug("Got all results, waiting for processes to exit.")
        return self.cleanup_processes(processes, results_queue, metadata_queue,
                                      results)