def run()

in dataflux_core/fast_list.py [0:0]


    def run(self) -> None:
        """Runs the worker."""
        logging.debug(f"Process {self.name} starting...")
        if not self.client:
            self.client = storage.Client(
                project=self.gcs_project,
                client_info=ClientInfo(user_agent="dataflux/0.0"),
            )
        else:
            user_agent.add_dataflux_user_agent(self.client)
        self.splitter = range_splitter.new_rangesplitter(self.default_alph)
        # When worker has started, attempt to push to all queues. If the idle or unidle queue
        # push fails, the worker will not initialize and will be ignored by the controller.
        # This allows us to safely handle multiprocessing failures that occur on startup.
        self.idle_queue.put(self.name)
        self.unidle_queue.put(self.name)
        self.heartbeat_queue.put(self.name)
        if self.retry_config:
            # Post a heartbeat when retrying so the process doesn't get killed.
            # The retry class automatically logs the retry as a debug log.
            def on_error(e: Exception):
                self.heartbeat_queue.put(self.name)

            self.retry_config._on_error = on_error
        if self.start_range is None and self.end_range is None:
            if not self.wait_for_work():
                return
        retries_remaining = self.max_retries
        while True:
            has_results = False
            try:
                list_blob_args = {
                    "max_results":
                    self.max_results,
                    "start_offset":
                    self.prefix + self.start_range,
                    "end_offset": ("" if not self.end_range else self.prefix +
                                   self.end_range),
                    "retry":
                    self.retry_config,
                }
                if self.prefix:
                    list_blob_args["prefix"] = self.prefix
                blobs = self.client.bucket(
                    self.bucket).list_blobs(**list_blob_args)
                self.api_call_count += 1
                i = 0
                self.heartbeat_queue.put(self.name)
                for blob in blobs:
                    i += 1
                    if ((not self.skip_compose
                         or not blob.name.startswith(COMPOSED_PREFIX)) and
                        (self.list_directory_objects or blob.name[-1] != "/")
                            and blob.storage_class
                            in self.allowed_storage_classes):
                        self.results.add((blob.name, blob.size))
                    # Remove the prefix from the name so that range calculations remain prefix-agnostic.
                    # This is necessary due to the unbounded end-range when splitting string namespaces
                    # of unknown size.
                    self.start_range = remove_prefix(blob.name, self.prefix)
                    if i == self.max_results:
                        # Only allow work stealing when paging.
                        has_results = True
                        break
                retries_remaining = self.max_retries
            except Exception as e:
                retries_remaining -= 1
                logging.error(
                    f"process {self.name} encountered error ({retries_remaining} retries left): {str(e)}"
                )
                if retries_remaining == 0:
                    logging.error("process " + self.name +
                                  " is out of retries; exiting")
                    self.error_queue.put(e)
                    return
                continue
            if has_results:
                # Check for work stealing.
                try:
                    self.send_work_stealing_needed_queue.get_nowait()
                except queue.Empty:
                    continue
                split_points = self.splitter.split_range(
                    self.start_range, self.end_range, 1)
                steal_range = (split_points[0], self.end_range)
                self.direct_work_available_queue.put(steal_range)
                self.end_range = split_points[0]
                self.max_results = 5000
            else:
                # All done, wait for work.
                if len(self.results) > 0:
                    self.results_queue.put(self.results)
                    self.results = set()
                if not self.wait_for_work():
                    return