in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]
def get_batches_for_gsurl(gcs_client: storage.Client,
gsurl: str,
recursive=True) -> List[List[str]]:
"""
This function creates batches of GCS uris for a given gsurl.
By default, it will recursively search for blobs in all sub-folders of the
given gsurl.
The function will ignore uris of objects which match the following:
- filenames which are present in constants.ACTION_FILENAMES
- filenames that start with a dot (.)
- _bqlock file created for ordered loads
- filename contains any constant.SPECIAL_GCS_DIRECTORY_NAMES in their path
returns an Array of their batches
(one batch has an array of multiple GCS uris)
"""
batches: List[List[str]] = []
parsed_url = urlparse(gsurl)
bucket_name: str = parsed_url.netloc
prefix_path: str = parsed_url.path.lstrip('/')
bucket: storage.Bucket = cached_get_bucket(gcs_client, bucket_name)
folders: Set[str] = get_folders_in_gcs_path_prefix(gcs_client,
bucket,
prefix_path,
recursive=recursive)
folders.add(prefix_path)
print(
json.dumps(
dict(message="Searching for blobs to load in"
" prefix path and sub-folders",
search_folders=list(folders),
severity="INFO")))
blobs: List[storage.Blob] = []
for folder in folders:
blobs += (list(
gcs_client.list_blobs(bucket, prefix=folder, delimiter="/")))
cumulative_bytes = 0
max_batch_size = int(
os.getenv("MAX_BATCH_BYTES", constants.DEFAULT_MAX_BATCH_BYTES))
batch: List[str] = []
for blob in blobs:
# The following blobs will be ignored:
# - filenames which are present in constants.ACTION_FILENAMES
# - filenames that start with a dot (.)
# - _bqlock file created for ordered loads
# - filenames with constants.SPECIAL_GCS_DIRECTORY_NAMES in their path
if (os.path.basename(blob.name) not in constants.ACTION_FILENAMES and
not os.path.basename(blob.name).startswith(".") and
os.path.basename(blob.name) != "_bqlock" and
not any(blob_dir_name in constants.SPECIAL_GCS_DIRECTORY_NAMES
for blob_dir_name in blob.name.split('/'))):
if blob.size == 0: # ignore empty files
print(f"ignoring empty file: gs://{bucket.name}/{blob.name}")
continue
cumulative_bytes += blob.size
# keep adding until we reach threshold
if cumulative_bytes <= max_batch_size or len(
batch) > constants.MAX_SOURCE_URIS_PER_LOAD:
batch.append(f"gs://{bucket_name}/{blob.name}")
else:
batches.append(batch.copy())
batch.clear()
batch.append(f"gs://{bucket_name}/{blob.name}")
cumulative_bytes = blob.size
# pick up remaining files in the final batch
if len(batch) > 0:
batches.append(batch.copy())
batch.clear()
print(
json.dumps(
dict(message="Logged batches of blobs to load in jsonPayload.",
batches=batches)))
if len(batches) > 1:
print(f"split into {len(batches)} batches.")
elif len(batches) < 1:
raise google.api_core.exceptions.NotFound(
f"No files to load at {gsurl}!")
return batches