dataflux_core/download.py (337 lines of code) (raw):
"""
Copyright 2024 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import annotations
import itertools
import logging
import math
import multiprocessing
import queue
import signal
import sys
import threading
import uuid
from typing import Iterator
from google.api_core.client_info import ClientInfo
from google.cloud import storage
from google.cloud.storage.retry import DEFAULT_RETRY
from dataflux_core import user_agent
# https://cloud.google.com/storage/docs/retry-strategy#python.
MODIFIED_RETRY = DEFAULT_RETRY.with_deadline(300.0).with_delay(initial=1.0,
multiplier=1.2,
maximum=45.0)
# https://cloud.google.com/storage/docs/composite-objects.
MAX_NUM_OBJECTS_TO_COMPOSE = 32
COMPOSED_PREFIX = "dataflux-composed-objects/"
current_composed_object = None
def compose(
project_name: str,
bucket_name: str,
destination_blob_name: str,
objects: list[tuple[str, int]],
storage_client: object = None,
retry_config: "google.api_core.retry.retry_unary.Retry" = MODIFIED_RETRY,
) -> object:
"""Compose the objects into a composite object, upload the composite object to the GCS bucket and returns it.
Args:
project_name: the name of the GCP project.
bucket_name: the name of the GCS bucket that holds the objects to compose.
The function uploads the the composed object to this bucket too.
destination_blob_name: the name of the composite object to be created.
objects: A list of tuples which indicate the object names and sizes (in bytes) in the bucket.
Example: [("object_name_A", 1000), ("object_name_B", 2000)]
storage_client: the google.cloud.storage.Client initialized with the project.
If not defined, the function will initialize the client with the project_name.
retry_config: The retry parameter to supply to the compose objects call.
Returns:
the "blob" of the composed object.
"""
if len(objects) > MAX_NUM_OBJECTS_TO_COMPOSE:
raise ValueError(
f"{MAX_NUM_OBJECTS_TO_COMPOSE} objects allowed to compose, received {len(objects)} objects."
)
if storage_client is None:
storage_client = storage.Client(project=project_name)
user_agent.add_dataflux_user_agent(storage_client)
bucket = storage_client.bucket(bucket_name)
destination = bucket.blob(destination_blob_name)
sources = list()
for each_object in objects:
blob_name = each_object[0]
sources.append(bucket.blob(blob_name))
destination.compose(sources, retry=retry_config)
return destination
def decompose(
project_name: str,
bucket_name: str,
composite_object_name: str,
objects: list[tuple[str, int]],
storage_client: object = None,
retry_config: "google.api_core.retry.retry_unary.Retry" = MODIFIED_RETRY,
) -> list[bytes]:
"""Decompose the composite objects and return the decomposed objects contents in bytes.
Args:
project_name: the name of the GCP project.
bucket_name: the name of the GCS bucket that holds the objects to compose.
The function uploads the the composed object to this bucket too.
composite_object_name: the name of the composite object to be created.
objects: A list of tuples which indicate the object names and sizes (in bytes) in the bucket.
Example: [("object_name_A", 1000), ("object_name_B", 2000)]
storage_client: the google.cloud.storage.Client initialized with the project.
If not defined, the function will initialize the client with the project_name.
retry_config: The retry parameter supplied to the download_as_bytes call.
Returns:
the contents (in bytes) of the decomposed objects.
"""
if storage_client is None:
storage_client = storage.Client(project=project_name)
user_agent.add_dataflux_user_agent(storage_client)
res = []
composed_object_content = download_single(
storage_client,
bucket_name,
composite_object_name,
retry_config=retry_config,
)
start = 0
for each_object in objects:
blob_size = each_object[1]
content = composed_object_content[start:start + blob_size]
res.append(content)
start += blob_size
if start != len(composed_object_content):
logging.error(
"decomposed object length = %s bytes, wanted = %s bytes.",
start,
len(composed_object_content),
)
return res
def download_single(
storage_client: object,
bucket_name: str,
object_name: str,
retry_config: "google.api_core.retry.retry_unary.Retry" = MODIFIED_RETRY,
) -> bytes:
"""Download the contents of this object as a bytes object and return it.
Args:
storage_client: the google.cloud.storage.Client initialized with the project.
bucket_name: the name of the GCS bucket that holds the object.
object_name: the name of the object to download.
retry_config: The retry parameter supplied to the download_as_bytes call.
Returns:
the contents of the object in bytes.
"""
bucket_handle = storage_client.bucket(bucket_name)
blob = bucket_handle.blob(object_name)
return blob.download_as_bytes(retry=retry_config)
class DataFluxDownloadOptimizationParams:
"""Parameters used to optimize DataFlux download performance.
Attributes:
max_composite_object_size: An integer indicating a cap for the maximum size of the composite object.
"""
def __init__(self, max_composite_object_size):
self.max_composite_object_size = max_composite_object_size
def df_download_thread(
results_queue: queue.Queue[list[bytes]],
project_name: str,
bucket_name: str,
objects: list[tuple[str, int]],
storage_client: object = None,
dataflux_download_optimization_params:
DataFluxDownloadOptimizationParams = None,
retry_config=MODIFIED_RETRY,
):
"""Threading helper that calls dataflux_download and places results onto queue.
Args:
results_queue: the queue on which to put all download results.
project_name: the name of the GCP project.
bucket_name: the name of the GCS bucket that holds the objects to compose.
The function uploads the the composed object to this bucket too.
objects: A list of tuples which indicate the object names and sizes (in bytes) in the bucket.
Example: [("object_name_A", 1000), ("object_name_B", 2000)]
storage_client: the google.cloud.storage.Client initialized with the project.
If not defined, the function will initialize the client with the project_name.
dataflux_download_optimization_params: the paramemters used to optimize the download performance.
retry_config: The retry configuration to pass to all retryable download operations
"""
result = dataflux_download(
project_name,
bucket_name,
objects,
storage_client,
dataflux_download_optimization_params,
# Always signify threading enabled so that signal handling is disabled.
threading_enabled=True,
retry_config=retry_config,
)
results_queue.put(result)
def dataflux_download_threaded(
project_name: str,
bucket_name: str,
objects: list[tuple[str, int]],
storage_client: object = None,
dataflux_download_optimization_params:
DataFluxDownloadOptimizationParams = None,
threads: int = 1,
retry_config=MODIFIED_RETRY,
) -> list[bytes]:
"""Perform the DataFlux download algorithm threaded to performantly download the object contents as bytes and return.
Args:
project_name: the name of the GCP project.
bucket_name: the name of the GCS bucket that holds the objects to compose.
The function uploads the the composed object to this bucket too.
objects: A list of tuples which indicate the object names and sizes (in bytes) in the bucket.
Example: [("object_name_A", 1000), ("object_name_B", 2000)]
storage_client: the google.cloud.storage.Client initialized with the project.
If not defined, the function will initialize the client with the project_name.
dataflux_download_optimization_params: the paramemters used to optimize the download performance.
threads: The number of threads on which to download at any given time.
retry_config: The retry configuration to pass to all retryable download operations
Returns:
the contents of the object in bytes.
"""
chunk_size = math.ceil(len(objects) / threads)
chunks = []
for i in range(threads):
chunk = objects[i * chunk_size:(i + 1) * chunk_size]
if chunk:
chunks.append(chunk)
results_queues = [queue.Queue() for _ in chunks]
thread_list = []
for i, chunk in enumerate(chunks):
thread = threading.Thread(
target=df_download_thread,
args=(
results_queues[i],
project_name,
bucket_name,
chunk,
storage_client,
dataflux_download_optimization_params,
retry_config,
),
)
thread_list.append(thread)
thread.start()
for thread in thread_list:
thread.join()
results = []
for q in results_queues:
while not q.empty():
results.extend(q.get())
return results
def dataflux_download_parallel(
project_name: str,
bucket_name: str,
objects: list[tuple[str, int]],
storage_client: object = None,
dataflux_download_optimization_params:
DataFluxDownloadOptimizationParams = None,
parallelization: int = 1,
retry_config=MODIFIED_RETRY,
) -> list[bytes]:
"""Perform the DataFlux download algorithm in parallel to download the object contents as bytes and return.
Args:
project_name: the name of the GCP project.
bucket_name: the name of the GCS bucket that holds the objects to compose.
The function uploads the the composed object to this bucket too.
objects: A list of tuples which indicate the object names and sizes (in bytes) in the bucket.
Example: [("object_name_A", 1000), ("object_name_B", 2000)]
storage_client: the google.cloud.storage.Client initialized with the project.
If not defined, the function will initialize the client with the project_name.
dataflux_download_optimization_params: the paramemters used to optimize the download performance.
parallelization: The number of parallel processes that will simultaneously execute the download.
retry_config: The retry configuration to pass to all retryable download operations
Returns:
the contents of the object in bytes.
"""
chunk_size = math.ceil(len(objects) / parallelization)
chunks = []
for i in range(parallelization):
chunk = objects[i * chunk_size:(i + 1) * chunk_size]
if chunk:
chunks.append(chunk)
with multiprocessing.Pool(processes=len(chunks)) as pool:
results = pool.starmap(
dataflux_download,
((
project_name,
bucket_name,
chunk,
storage_client,
dataflux_download_optimization_params,
False,
retry_config,
) for chunk in chunks),
)
return list(itertools.chain.from_iterable(results))
def dataflux_download(
project_name: str,
bucket_name: str,
objects: list[tuple[str, int]],
storage_client: object = None,
dataflux_download_optimization_params:
DataFluxDownloadOptimizationParams = None,
threading_enabled=False,
retry_config=MODIFIED_RETRY,
) -> list[bytes]:
"""Perform the DataFlux download algorithm to download the object contents as bytes and return.
Args:
project_name: the name of the GCP project.
bucket_name: the name of the GCS bucket that holds the objects to compose.
The function uploads the the composed object to this bucket too.
objects: A list of tuples which indicate the object names and sizes (in bytes) in the bucket.
Example: [("object_name_A", 1000), ("object_name_B", 2000)]
storage_client: the google.cloud.storage.Client initialized with the project.
If not defined, the function will initialize the client with the project_name.
dataflux_download_optimization_params: the paramemters used to optimize the download performance.
retry_config: The retry configuration to pass to all retryable download operations
Returns:
the contents of the object in bytes.
"""
if storage_client is None:
storage_client = storage.Client(project=project_name)
user_agent.add_dataflux_user_agent(storage_client)
res = []
max_composite_object_size = (
dataflux_download_optimization_params.max_composite_object_size)
i = 0
# Register the cleanup signal handler for SIGINT.
if not threading_enabled:
signal.signal(signal.SIGINT, term_signal_handler)
global current_composed_object
while i < len(objects):
curr_object_name = objects[i][0]
curr_object_size = objects[i][1]
if curr_object_size > max_composite_object_size:
# Download the single object.
curr_object_content = download_single(
storage_client=storage_client,
bucket_name=bucket_name,
object_name=curr_object_name,
retry_config=retry_config,
)
res.append(curr_object_content)
i += 1
else:
# Dynamically compose and decompose based on the object size.
objects_slice = []
curr_size = 0
while (i < len(objects) and curr_size <= max_composite_object_size
and len(objects_slice) < MAX_NUM_OBJECTS_TO_COMPOSE):
curr_size += objects[i][1]
objects_slice.append(objects[i])
i += 1
if len(objects_slice) == 1:
object_name = objects_slice[0][0]
curr_object_content = download_single(
storage_client=storage_client,
bucket_name=bucket_name,
object_name=object_name,
retry_config=retry_config,
)
res.append(curr_object_content)
else:
# If the number of objects > 1, we want to compose, download, decompose and delete the composite object.
# Need to create a unique composite name to avoid mutation on the same object among processes.
composed_object_name = COMPOSED_PREFIX + str(uuid.uuid4())
composed_object = compose(
project_name,
bucket_name,
composed_object_name,
objects_slice,
storage_client,
retry_config=retry_config,
)
current_composed_object = composed_object
res.extend(
decompose(
project_name,
bucket_name,
composed_object_name,
objects_slice,
storage_client,
retry_config=retry_config,
))
try:
composed_object.delete(retry=retry_config)
current_composed_object = None
except Exception as e:
logging.exception(
f"exception while deleting the composite object: {e}")
return res
def dataflux_download_lazy(
project_name: str,
bucket_name: str,
objects: list[tuple[str, int]],
storage_client: object = None,
dataflux_download_optimization_params:
DataFluxDownloadOptimizationParams = None,
threading_enabled=False,
retry_config: "google.api_core.retry.retry_unary.Retry" = MODIFIED_RETRY,
) -> Iterator[bytes]:
"""Perform the DataFlux download algorithm to download the object contents as bytes in a lazy fashion.
Args:
project_name: the name of the GCP project.
bucket_name: the name of the GCS bucket that holds the objects to compose.
The function uploads the the composed object to this bucket too.
objects: A list of tuples which indicate the object names and sizes (in bytes) in the bucket.
Example: [("object_name_A", 1000), ("object_name_B", 2000)]
storage_client: the google.cloud.storage.Client initialized with the project.
If not defined, the function will initialize the client with the project_name.
dataflux_download_optimization_params: the paramemters used to optimize the download performance.
retry_config: The retry parameter to supply to the compose objects call.
Returns:
An iterator of the contents of the object in bytes.
"""
if storage_client is None:
storage_client = storage.Client(project=project_name)
user_agent.add_dataflux_user_agent(storage_client)
max_composite_object_size = (
dataflux_download_optimization_params.max_composite_object_size)
i = 0
# Register the cleanup signal handler for SIGINT.
if not threading_enabled:
signal.signal(signal.SIGINT, term_signal_handler)
global current_composed_object
while i < len(objects):
curr_object_name = objects[i][0]
curr_object_size = objects[i][1]
if curr_object_size > max_composite_object_size:
# Download the single object.
curr_object_content = download_single(
storage_client=storage_client,
bucket_name=bucket_name,
object_name=curr_object_name,
retry_config=retry_config,
)
yield from [curr_object_content]
i += 1
else:
# Dynamically compose and decompose based on the object size.
objects_slice = []
curr_size = 0
while (i < len(objects) and curr_size <= max_composite_object_size
and len(objects_slice) < MAX_NUM_OBJECTS_TO_COMPOSE):
curr_size += objects[i][1]
objects_slice.append(objects[i])
i += 1
if len(objects_slice) == 1:
object_name = objects_slice[0][0]
curr_object_content = download_single(
storage_client=storage_client,
bucket_name=bucket_name,
object_name=object_name,
retry_config=retry_config,
)
yield from [curr_object_content]
else:
# If the number of objects > 1, we want to compose, download, decompose and delete the composite object.
# Need to create a unique composite name to avoid mutation on the same object among processes.
composed_object_name = COMPOSED_PREFIX + str(uuid.uuid4())
composed_object = compose(
project_name,
bucket_name,
composed_object_name,
objects_slice,
storage_client,
retry_config=retry_config,
)
current_composed_object = composed_object
yield from (decompose(
project_name,
bucket_name,
composed_object_name,
objects_slice,
storage_client,
retry_config=retry_config,
))
try:
composed_object.delete(retry=retry_config)
current_composed_object = None
except Exception as e:
logging.exception(
f"exception while deleting the composite object: {e}")
def clean_composed_object(composed_object):
if composed_object:
try:
composed_object.delete(retry=MODIFIED_RETRY)
except Exception as e:
logging.exception(
f"exception while deleting composite object: {e}")
def term_signal_handler(signal_num, frame):
print("Ctrl+C interrupt detected. Cleaning up and exiting...")
clean_composed_object(current_composed_object)
sys.exit(0)