def _assert_files_exist()

in projects/dataflow-gcs-to-alloydb/src/dataflow_gcs_to_alloydb.py [0:0]


def _assert_files_exist(input_pattern: str):
    """Validates and asserts that there are files on the GCS file pattern.

    Args:
        input_pattern: file pattern glob to validate.

    Raises:
        ValueError: if files do not match.
    """
    scheme = input_pattern.split('://')[0]
    if scheme != 'gs':
        return  # Only Cloud Storage paths are validated.

    # Raises ValueError if not a valid gs:// path.
    bucket_name, object_pattern = gcsio.parse_gcs_path(input_pattern)

    object_parts = object_pattern.split('/')
    for i in range(len(object_parts)):
        if re.compile(r'[^a-zA-Z-_]').search(object_parts[i]):
            break

    prefix = '/'.join(object_parts[:i])
    match_glob = '/'.join(object_parts[i:])
    if prefix and match_glob:
        prefix += '/'

    gcs_client = storage.Client(
        client_info=ClientInfo(
            user_agent='cloud-solutions/dataflow-gcs-to-alloydb-v1'
        )
    )
    files = gcs_client.bucket(bucket_name=bucket_name).list_blobs(
        prefix=prefix,
        match_glob=match_glob,
        max_results=1,  # Only need to know that there is at least one file.
    )

    if not files:
        raise ValueError(f'No files matching pattern: {input_pattern}')