in projects/dataflow-gcs-to-alloydb/src/dataflow_gcs_to_alloydb.py [0:0]
def _assert_files_exist(input_pattern: str):
"""Validates and asserts that there are files on the GCS file pattern.
Args:
input_pattern: file pattern glob to validate.
Raises:
ValueError: if files do not match.
"""
scheme = input_pattern.split('://')[0]
if scheme != 'gs':
return # Only Cloud Storage paths are validated.
# Raises ValueError if not a valid gs:// path.
bucket_name, object_pattern = gcsio.parse_gcs_path(input_pattern)
object_parts = object_pattern.split('/')
for i in range(len(object_parts)):
if re.compile(r'[^a-zA-Z-_]').search(object_parts[i]):
break
prefix = '/'.join(object_parts[:i])
match_glob = '/'.join(object_parts[i:])
if prefix and match_glob:
prefix += '/'
gcs_client = storage.Client(
client_info=ClientInfo(
user_agent='cloud-solutions/dataflow-gcs-to-alloydb-v1'
)
)
files = gcs_client.bucket(bucket_name=bucket_name).list_blobs(
prefix=prefix,
match_glob=match_glob,
max_results=1, # Only need to know that there is at least one file.
)
if not files:
raise ValueError(f'No files matching pattern: {input_pattern}')