in dataflux_core/fast_list.py [0:0]
def run(self) -> None:
"""Runs the worker."""
logging.debug(f"Process {self.name} starting...")
if not self.client:
self.client = storage.Client(
project=self.gcs_project,
client_info=ClientInfo(user_agent="dataflux/0.0"),
)
else:
user_agent.add_dataflux_user_agent(self.client)
self.splitter = range_splitter.new_rangesplitter(self.default_alph)
# When worker has started, attempt to push to all queues. If the idle or unidle queue
# push fails, the worker will not initialize and will be ignored by the controller.
# This allows us to safely handle multiprocessing failures that occur on startup.
self.idle_queue.put(self.name)
self.unidle_queue.put(self.name)
self.heartbeat_queue.put(self.name)
if self.retry_config:
# Post a heartbeat when retrying so the process doesn't get killed.
# The retry class automatically logs the retry as a debug log.
def on_error(e: Exception):
self.heartbeat_queue.put(self.name)
self.retry_config._on_error = on_error
if self.start_range is None and self.end_range is None:
if not self.wait_for_work():
return
retries_remaining = self.max_retries
while True:
has_results = False
try:
list_blob_args = {
"max_results":
self.max_results,
"start_offset":
self.prefix + self.start_range,
"end_offset": ("" if not self.end_range else self.prefix +
self.end_range),
"retry":
self.retry_config,
}
if self.prefix:
list_blob_args["prefix"] = self.prefix
blobs = self.client.bucket(
self.bucket).list_blobs(**list_blob_args)
self.api_call_count += 1
i = 0
self.heartbeat_queue.put(self.name)
for blob in blobs:
i += 1
if ((not self.skip_compose
or not blob.name.startswith(COMPOSED_PREFIX)) and
(self.list_directory_objects or blob.name[-1] != "/")
and blob.storage_class
in self.allowed_storage_classes):
self.results.add((blob.name, blob.size))
# Remove the prefix from the name so that range calculations remain prefix-agnostic.
# This is necessary due to the unbounded end-range when splitting string namespaces
# of unknown size.
self.start_range = remove_prefix(blob.name, self.prefix)
if i == self.max_results:
# Only allow work stealing when paging.
has_results = True
break
retries_remaining = self.max_retries
except Exception as e:
retries_remaining -= 1
logging.error(
f"process {self.name} encountered error ({retries_remaining} retries left): {str(e)}"
)
if retries_remaining == 0:
logging.error("process " + self.name +
" is out of retries; exiting")
self.error_queue.put(e)
return
continue
if has_results:
# Check for work stealing.
try:
self.send_work_stealing_needed_queue.get_nowait()
except queue.Empty:
continue
split_points = self.splitter.split_range(
self.start_range, self.end_range, 1)
steal_range = (split_points[0], self.end_range)
self.direct_work_available_queue.put(steal_range)
self.end_range = split_points[0]
self.max_results = 5000
else:
# All done, wait for work.
if len(self.results) > 0:
self.results_queue.put(self.results)
self.results = set()
if not self.wait_for_work():
return