def get_batches_for_gsurl()

in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]


def get_batches_for_gsurl(gcs_client: storage.Client,
                          gsurl: str,
                          recursive=True) -> List[List[str]]:
    """
    This function creates batches of GCS uris for a given gsurl.
    By default, it will recursively search for blobs in all sub-folders of the
    given gsurl.
    The function will ignore uris of objects which match the following:
      - filenames which are present in constants.ACTION_FILENAMES
      - filenames that start with a dot (.)
      - _bqlock file created for ordered loads
      - filename contains any constant.SPECIAL_GCS_DIRECTORY_NAMES in their path
    returns an Array of their batches
    (one batch has an array of multiple GCS uris)
    """
    batches: List[List[str]] = []
    parsed_url = urlparse(gsurl)
    bucket_name: str = parsed_url.netloc
    prefix_path: str = parsed_url.path.lstrip('/')

    bucket: storage.Bucket = cached_get_bucket(gcs_client, bucket_name)
    folders: Set[str] = get_folders_in_gcs_path_prefix(gcs_client,
                                                       bucket,
                                                       prefix_path,
                                                       recursive=recursive)
    folders.add(prefix_path)
    print(
        json.dumps(
            dict(message="Searching for blobs to load in"
                 " prefix path and sub-folders",
                 search_folders=list(folders),
                 severity="INFO")))
    blobs: List[storage.Blob] = []
    for folder in folders:
        blobs += (list(
            gcs_client.list_blobs(bucket, prefix=folder, delimiter="/")))
    cumulative_bytes = 0
    max_batch_size = int(
        os.getenv("MAX_BATCH_BYTES", constants.DEFAULT_MAX_BATCH_BYTES))
    batch: List[str] = []
    for blob in blobs:
        # The following blobs will be ignored:
        #   - filenames which are present in constants.ACTION_FILENAMES
        #   - filenames that start with a dot (.)
        #   - _bqlock file created for ordered loads
        #   - filenames with constants.SPECIAL_GCS_DIRECTORY_NAMES in their path
        if (os.path.basename(blob.name) not in constants.ACTION_FILENAMES and
                not os.path.basename(blob.name).startswith(".") and
                os.path.basename(blob.name) != "_bqlock" and
                not any(blob_dir_name in constants.SPECIAL_GCS_DIRECTORY_NAMES
                        for blob_dir_name in blob.name.split('/'))):
            if blob.size == 0:  # ignore empty files
                print(f"ignoring empty file: gs://{bucket.name}/{blob.name}")
                continue
            cumulative_bytes += blob.size

            # keep adding until we reach threshold
            if cumulative_bytes <= max_batch_size or len(
                    batch) > constants.MAX_SOURCE_URIS_PER_LOAD:
                batch.append(f"gs://{bucket_name}/{blob.name}")
            else:
                batches.append(batch.copy())
                batch.clear()
                batch.append(f"gs://{bucket_name}/{blob.name}")
                cumulative_bytes = blob.size
    # pick up remaining files in the final batch
    if len(batch) > 0:
        batches.append(batch.copy())
        batch.clear()

    print(
        json.dumps(
            dict(message="Logged batches of blobs to load in jsonPayload.",
                 batches=batches)))

    if len(batches) > 1:
        print(f"split into {len(batches)} batches.")
    elif len(batches) < 1:
        raise google.api_core.exceptions.NotFound(
            f"No files to load at {gsurl}!")
    return batches