in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]
def get_table_prefix(gcs_client: storage.Client, blob: storage.Blob) -> str:
"""Find the table prefix for a object_id based on the destination regex.
Args:
gcs_client: storage.Client
blob: storage.Blob to parse
Returns:
str: table prefix
"""
basename = os.path.basename(blob.name)
if basename in {
constants.BACKFILL_FILENAME,
constants.START_BACKFILL_FILENAME,
"_bqlock",
}:
# These files will not match the regex and always should appear at the
# table level.
return removesuffix(blob.name, f"/{basename}")
load_config = construct_config(
gcs_client, blob, constants.BQ_LOAD_CONFIG_FILENAME).get('load')
if load_config:
destination_regex = load_config.get('destinationRegex',
constants.DESTINATION_REGEX)
print(f"Retrieved DESTINATION_REGEX: {destination_regex}")
match = re.compile(destination_regex).match(
blob.name.replace("/_backlog/", "/"))
if not match:
raise exceptions.DestinationRegexMatchException(
f"could not determine table prefix for object id: {blob.name}"
"because it did not contain a match for destination_regex: "
f"{destination_regex}")
table_group_index = match.re.groupindex.get("table")
if table_group_index:
table_level_index = match.regs[table_group_index][1]
table_prefix = blob.name[:table_level_index].rstrip('/')
print(f"{table_prefix=}")
return table_prefix
raise exceptions.DestinationRegexMatchException(
f"could not determine table prefix for object id: {blob.name}"
"because it did not contain a match for the table capturing group "
f"in destination regex: {constants.DESTINATION_REGEX}")