dataflux_core/fast_list.py (419 lines of code) (raw):

""" Copyright 2023 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ from __future__ import annotations import logging import multiprocessing import queue import time from dataflux_core import range_splitter, user_agent from dataflux_core.download import COMPOSED_PREFIX from google.api_core.client_info import ClientInfo from google.cloud import storage from google.cloud.storage.retry import DEFAULT_RETRY DEFAULT_ALLOWED_CLASS = ["STANDARD"] MODIFIED_RETRY = DEFAULT_RETRY.with_deadline(300.0).with_delay(initial=1.0, multiplier=1.2, maximum=45.0) def remove_prefix(text: str, prefix: str): """Helper function that removes prefix from a string. Args: text: String of text to trim a prefix from. prefix: String of text that will be trimmed from text. Returns: Text value with the specified prefix removed. """ # Note that as of python 3.9 removeprefix is built into string. if text.startswith(prefix): return text[len(prefix):] return text class ListWorker(object): """Worker that lists a range of objects from a GCS bucket. Attributes: name: String name of the worker. gcs_project: The string name of the google cloud storage project to list from. bucket: The string name of the storage bucket to list from.from . import fast_list, download send_work_stealing_needed_queue: Multiprocessing queue pushed to when a worker needs more work. heartbeat_queue: Multiprocessing queue pushed to indicating worker is running nominally. direct_work_available_queue: Multiprocessing queue to push availble work stealing ranges to. idle_queue: Multiprocessing queue pushed to when worker is waiting for new work to steal. unidle_queue: Multiprocessing queue pushed to when the worker has successfully stolen work. results_queue: Multiprocessing queue on which the worker pushes its listing results onto. metadata_queue: Multiprocessing queue on which the worker pushes tracking metadata. start_range: Stirng start range worker will begin listing from. end_range: String end range worker will list until. retry_config: The retry parameter to supply to list_blob. results: Set storing aggregate results prior to pushing onto results_queue. client: The GCS client through which all GCS list operations are executed. skip_compose: When true, skip listing files with the composed object prefix. list_directory_objects: When true, include files with names ending in "/" in the listing. Default false. prefix: When provided, only list objects under this prefix. allowed_storage_classes: The set of GCS Storage Class types fast list will include. max_results: The maximum results per list call (set to max page size of 5000). splitter: The range_splitter object used by this worker to divide work. default_alph: The baseline alphabet used to initialize the range_splitter. api_call_count: Variable tracking the number of GCS list calls made by the worker. """ def __init__( self, name: str, gcs_project: str, bucket: str, send_work_stealing_needed_queue: "multiprocessing.Queue[str]", heartbeat_queue: "multiprocessing.Queue[str]", direct_work_available_queue: "multiprocessing.Queue[tuple[str, str]]", idle_queue: "multiprocessing.Queue[str]", unidle_queue: "multiprocessing.Queue[str]", results_queue: "multiprocessing.Queue[set[tuple[str, int]]]", metadata_queue: "multiprocessing.Queue[tuple[str, int]]", error_queue: "multiprocessing.Queue[Exception]", start_range: str, end_range: str, retry_config: "google.api_core.retry.retry_unary.Retry" = MODIFIED_RETRY, client: storage.Client = None, skip_compose: bool = True, list_directory_objects: bool = False, prefix: str = "", allowed_storage_classes: list[str] = DEFAULT_ALLOWED_CLASS, max_retries: int = 5, ): self.name = name self.gcs_project = gcs_project self.bucket = bucket self.send_work_stealing_needed_queue = send_work_stealing_needed_queue self.heartbeat_queue = heartbeat_queue self.direct_work_available_queue = direct_work_available_queue self.idle_queue = idle_queue self.unidle_queue = unidle_queue self.results_queue = results_queue self.metadata_queue = metadata_queue self.error_queue = error_queue self.start_range = start_range self.end_range = end_range self.results: set[tuple[str, int]] = set() self.client = client self.max_results = 5000 self.splitter = None self.default_alph = "ab" self.skip_compose = skip_compose self.list_directory_objects = list_directory_objects self.prefix = prefix if prefix else "" self.allowed_storage_classes = allowed_storage_classes self.api_call_count = 0 self.max_retries = max_retries self.retry_config = retry_config def wait_for_work(self) -> bool: """Indefinitely waits for available work and consumes it once available. Returns: Boolean value indicating that new work has been acquired. The function will only return False in response to receiving a shutdown signal (None) from the controller. """ self.send_work_stealing_needed_queue.put(self.name) self.idle_queue.put(self.name) logging.debug(f"Process {self.name} waiting for work...") while True: try: self.heartbeat_queue.put(self.name) new_range = self.direct_work_available_queue.get_nowait() # None is pushed onto the queue as the shutdown signal once all work is finished. if new_range[0] != None: self.unidle_queue.put(self.name) except queue.Empty: time.sleep(0.1) continue break if new_range[0] is None: logging.debug(f"Process {self.name} didn't receive work") # Upon receiving shutdown signal log all relevant metadata. md = (self.name, self.api_call_count) self.metadata_queue.put(md) return False self.start_range = new_range[0] self.end_range = new_range[1] logging.debug(f"Process {self.name} got new range [{self.start_range}," f" {self.end_range}]") return True def run(self) -> None: """Runs the worker.""" logging.debug(f"Process {self.name} starting...") if not self.client: self.client = storage.Client( project=self.gcs_project, client_info=ClientInfo(user_agent="dataflux/0.0"), ) else: user_agent.add_dataflux_user_agent(self.client) self.splitter = range_splitter.new_rangesplitter(self.default_alph) # When worker has started, attempt to push to all queues. If the idle or unidle queue # push fails, the worker will not initialize and will be ignored by the controller. # This allows us to safely handle multiprocessing failures that occur on startup. self.idle_queue.put(self.name) self.unidle_queue.put(self.name) self.heartbeat_queue.put(self.name) if self.retry_config: # Post a heartbeat when retrying so the process doesn't get killed. # The retry class automatically logs the retry as a debug log. def on_error(e: Exception): self.heartbeat_queue.put(self.name) self.retry_config._on_error = on_error if self.start_range is None and self.end_range is None: if not self.wait_for_work(): return retries_remaining = self.max_retries while True: has_results = False try: list_blob_args = { "max_results": self.max_results, "start_offset": self.prefix + self.start_range, "end_offset": ("" if not self.end_range else self.prefix + self.end_range), "retry": self.retry_config, } if self.prefix: list_blob_args["prefix"] = self.prefix blobs = self.client.bucket( self.bucket).list_blobs(**list_blob_args) self.api_call_count += 1 i = 0 self.heartbeat_queue.put(self.name) for blob in blobs: i += 1 if ((not self.skip_compose or not blob.name.startswith(COMPOSED_PREFIX)) and (self.list_directory_objects or blob.name[-1] != "/") and blob.storage_class in self.allowed_storage_classes): self.results.add((blob.name, blob.size)) # Remove the prefix from the name so that range calculations remain prefix-agnostic. # This is necessary due to the unbounded end-range when splitting string namespaces # of unknown size. self.start_range = remove_prefix(blob.name, self.prefix) if i == self.max_results: # Only allow work stealing when paging. has_results = True break retries_remaining = self.max_retries except Exception as e: retries_remaining -= 1 logging.error( f"process {self.name} encountered error ({retries_remaining} retries left): {str(e)}" ) if retries_remaining == 0: logging.error("process " + self.name + " is out of retries; exiting") self.error_queue.put(e) return continue if has_results: # Check for work stealing. try: self.send_work_stealing_needed_queue.get_nowait() except queue.Empty: continue split_points = self.splitter.split_range( self.start_range, self.end_range, 1) steal_range = (split_points[0], self.end_range) self.direct_work_available_queue.put(steal_range) self.end_range = split_points[0] self.max_results = 5000 else: # All done, wait for work. if len(self.results) > 0: self.results_queue.put(self.results) self.results = set() if not self.wait_for_work(): return def run_list_worker( name: str, gcs_project: str, bucket: str, send_work_stealing_needed_queue: "multiprocessing.Queue[str]", heartbeat_queue: "multiprocessing.Queue[str]", direct_work_available_queue: "multiprocessing.Queue[tuple[str, str]]", idle_queue: "multiprocessing.Queue[str]", unidle_queue: "multiprocessing.Queue[str]", results_queue: "multiprocessing.Queue[set[tuple[str, int]]]", metadata_queue: "multiprocessing.Queue[tuple[str, int]]", error_queue: "multiprocessing.Queue[Exception]", start_range: str, end_range: str, retry_config: "google.api_core.retry.retry_unary.Retry" = MODIFIED_RETRY, client: storage.Client = None, skip_compose: bool = True, prefix: str = "", allowed_storage_classes: list[str] = DEFAULT_ALLOWED_CLASS, ) -> None: """Helper function to execute a ListWorker. Args: name: String name of the list worker. gcs_project: String name of the google cloud project in use. bucket: String name of the google cloud bucket to list from. send_work_stealing_needed_queue: Multiprocessing queue pushed to when a worker needs more work. heartbeat_queue: Multiprocessing queue pushed to while a worker is running nominally. direct_work_available_queue: Multiprocessing queue to push availble work stealing ranges to. idle_queue: Multiprocessing queue pushed to when worker is waiting for new work to steal. unidle_queue: Multiprocessing queue pushed to when the worker has successfully stolen work. results_queue: Multiprocessing queue on which the worker pushes its listing results onto. metadata_queue: Multiprocessing queue on which the worker pushes tracking metadata. error_queue: Multiprocessing queue to track errors from the worker process. start_range: String start range worker will begin listing from. end_range: String end range worker will list until. retry_config: The retry parameter to supply to list_blob. client: The GCS storage client. When not provided, will be derived from background auth. skip_compose: When true, skip listing files with the composed object prefix. prefix: When provided, only list objects under this prefix. allowed_storage_classes: The set of GCS Storage Class types fast list will include. """ ListWorker( name, gcs_project, bucket, send_work_stealing_needed_queue, heartbeat_queue, direct_work_available_queue, idle_queue, unidle_queue, results_queue, metadata_queue, error_queue, start_range, end_range, retry_config, client, skip_compose=skip_compose, prefix=prefix, allowed_storage_classes=allowed_storage_classes, ).run() class ListingController(object): """This controller manages and monitors all listing workers operating on the GCS bucket. Attributes: max_parallelism: The maximum number of processes to start via the Multiprocessing library. gcs_project: The string name of the google cloud storage project to list from. bucket: The string name of the storage bucket to list from. inited: The set of ListWorker processes that have succesfully started. checkins: A dictionary tracking the last known checkin time for each inited ListWorker. waiting_for_work: The number of ListWorker processes currently waiting for new listing work. sort_results: Boolean indicating whether the final result set should be sorted or unsorted. skip_compose: When true, skip listing files with the composed object prefix. prefix: When provided, only list objects under this prefix. allowed_storage_classes: The set of GCS Storage Class types fast list will include. retry_config: The retry config passed to list_blobs. """ def __init__( self, max_parallelism: int, project: str, bucket: str, sort_results: bool = False, skip_compose: bool = True, prefix: str = "", allowed_storage_classes: list[str] = DEFAULT_ALLOWED_CLASS, retry_config=MODIFIED_RETRY, ): # The maximum number of threads utilized in the fast list operation. self.max_parallelism = max_parallelism self.gcs_project = project self.bucket = bucket self.inited = set() self.checkins = {} self.waiting_for_work = 0 self.sort_results = sort_results self.client = None self.skip_compose = skip_compose self.prefix = prefix self.allowed_storage_classes = allowed_storage_classes self.retry_config = retry_config def manage_tracking_queues( self, idle_queue: "multiprocessing.Queue[str]", unidle_queue: "multiprocessing.Queue[str]", heartbeat_queue: "multiprocessing.Queue[str]", ) -> None: """Manages metadata queues to track execution of the listing operation. Args: idle_queue: the queue workers push to when in need of new work to steal. unidle_queue: the queue workers push to when they steal work. heartbeat_queue: the queue workers push to continuously while running nominally. """ while True: try: idle_queue.get_nowait() self.waiting_for_work += 1 except queue.Empty: break while True: try: unidle_queue.get_nowait() self.waiting_for_work -= 1 except queue.Empty: break while True: try: inited_worker = heartbeat_queue.get_nowait() current_time = time.time() self.inited.add(inited_worker) self.checkins[inited_worker] = current_time except queue.Empty: break def check_crashed_processes(self) -> bool: """Checks if any processes have crashed. Returns: A boolean indicating if any processes have crashed after initialization. If this function returns true, it indicates a need to restart the listing operation. """ logging.debug("checking for crashed procs...") now = time.time() crashed = [] # Wait at least 60 seconds or 2 times the API call retry delay for check-ins, # otherwise processes might appear to be crashed while retrying API calls. checkin_wait = 2 * self.retry_config._maximum if self.retry_config else 0 checkin_wait = max(checkin_wait, 60) for inited_worker, last_checkin in self.checkins.items(): if now - last_checkin > checkin_wait: crashed.append(inited_worker) for proc in crashed: if proc in self.inited: logging.error( "process crash detected, ending list procedure...") return True return False def cleanup_processes( self, processes: "list[multiprocessing.Process]", results_queue: "multiprocessing.Queue[set[tuple[str, int]]]", metadata_queue: "multiprocessing.Queue[tuple[str, int]]", results: "set[tuple[str, int]]", ) -> list[tuple[str, int]]: """Allows processes to shut down, kills procs that failed to initialize. Args: processes: the list of processes. results_queue: the queue for transmitting all result tuples from listing. metadata_queue: the queue for transmitting all tracking metadata from workers. results: the set of unique results consumed from results_queue. Returns: A sorted list of (str, int) tuples indicating the name and file size of each unique file listed in the listing process. """ api_call_count = 0 while True: alive = False live_procs = 0 for p in processes: if p.is_alive(): alive = True live_procs += 1 while True: try: result = results_queue.get_nowait() results.update(result) logging.debug(f"Result count: {len(results)}") except queue.Empty: break time.sleep(0.2) break while True: try: metadata = metadata_queue.get_nowait() api_call_count += metadata[1] except queue.Empty: break logging.debug("Live procs: %d", live_procs) logging.debug("Inited procs: %d", len(self.inited)) if live_procs <= self.max_parallelism - len(self.inited): alive = False # This prevents any memory leaks from multiple executions, but does kill # the stuck processes very aggressively. It does not cause issues in # execution, but looks very loud to the user if they are watching debug # output. for p in processes: if p.is_alive(): p.terminate() if not alive: logging.debug(f"Total GCS API call count: {api_call_count}") if self.sort_results: return sorted(results) return list(results) def terminate_now( self, processes: "list[multiprocessing.Process]") -> RuntimeError: """Terminates all processes immediately. Args: processes: The full list of multiprocessing processes. Returns: RuntimeError indicating that one or more multiprocess processes has become unresponsive """ for p in processes: p.terminate() raise RuntimeError( "multiprocessing child process became unresponsive; check logs for underlying error" ) def run(self) -> list[tuple[str, int]]: """Runs the controller that manages fast listing. Returns: A sorted list of (str, int) tuples indicating the name and file size of each unique file listed in the listing process. """ # Define the queues. send_work_stealing_needed_queue: multiprocessing.Queue[str] = ( multiprocessing.Queue()) heartbeat_queue: multiprocessing.Queue[str] = multiprocessing.Queue() direct_work_available_queue: multiprocessing.Queue[tuple[str, str]] = ( multiprocessing.Queue()) idle_queue: multiprocessing.Queue[str] = multiprocessing.Queue() unidle_queue: multiprocessing.Queue[str] = multiprocessing.Queue() results_queue: multiprocessing.Queue[set[tuple[str, int]]] = ( multiprocessing.Queue()) metadata_queue: multiprocessing.Queue[tuple[ str, int]] = multiprocessing.Queue() error_queue: multiprocessing.Queue[Exception] = multiprocessing.Queue() processes = [] results: set[tuple[str, int]] = set() for i in range(self.max_parallelism): p = multiprocessing.Process( target=run_list_worker, args=( "dataflux-listing-proc." + str(i), self.gcs_project, self.bucket, send_work_stealing_needed_queue, heartbeat_queue, direct_work_available_queue, idle_queue, unidle_queue, results_queue, metadata_queue, error_queue, "" if i == 0 else None, "" if i == 0 else None, self.retry_config, self.client, self.skip_compose, self.prefix, self.allowed_storage_classes, ), ) processes.append(p) p.start() # Wait before starting the next process to avoid deadlock when multiple processes # attempt to register with the same multiprocessing queue. time.sleep(0.1) while True: time.sleep(0.2) try: e = error_queue.get_nowait() logging.error( f"Got error from child process; exiting. Check child process logs for more details. Error: {e}" ) return self.terminate_now(processes) except queue.Empty: pass alive = False for p in processes: if p.is_alive(): alive = True break new_results = set() while True: try: result = results_queue.get_nowait() new_results.update(result) except queue.Empty: break if len(new_results) > 0: results.update(new_results) logging.debug(f"Result count: {len(results)}") if not alive: break # Update all queues related to tracking process status. self.manage_tracking_queues(idle_queue, unidle_queue, heartbeat_queue) if self.check_crashed_processes(): return self.terminate_now(processes) logging.debug("Inited procs: %d", len(self.inited)) logging.debug("Waiting for work: %d", self.waiting_for_work) if len(self.inited) == self.waiting_for_work and ( self.waiting_for_work > 0): logging.debug("Exiting, all processes are waiting for work") for _ in range(self.max_parallelism * 2): direct_work_available_queue.put((None, None)) break while True: try: result = results_queue.get_nowait() results.update(result) logging.debug(f"Result count: {len(results)}") except queue.Empty: break logging.debug("Got all results, waiting for processes to exit.") return self.cleanup_processes(processes, results_queue, metadata_queue, results)