in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]
def construct_config(storage_client: storage.Client, blob: storage.Blob,
config_filename: str) -> Dict:
"""
merge dictionaries for configs in parent directories.
The configs closest to gsurl should take precedence.
"""
gsurl = removesuffix(f"gs://{blob.bucket.name}/{blob.name}",
constants.SUCCESS_FILENAME)
blob = storage.Blob.from_string(gsurl)
bucket_name = blob.bucket.name
obj_path = blob.name
parts = removesuffix(obj_path, "/").split("/")
def _get_parent_config(path):
return _get_parent_config_file(storage_client, config_filename,
bucket_name, path)
config_q: Deque[Dict[str, Any]] = collections.deque()
if config_filename == constants.BQ_LOAD_CONFIG_FILENAME:
config_q.append(constants.BASE_LOAD_JOB_CONFIG)
while parts:
config = _get_parent_config("/".join(parts))
if config:
print(f"found config: {'/'.join(parts)}")
config_q.append(json.loads(config))
parts.pop()
merged_config: Dict = {}
while config_q:
recursive_update(merged_config, config_q.popleft(), in_place=True)
print(f"merged_config for {config_filename}: {json.dumps(merged_config)}")
if merged_config == constants.BASE_LOAD_JOB_CONFIG:
print("falling back to default CSV load job config. "
"Did you forget load.json?")
return {"load": constants.DEFAULT_LOAD_JOB_CONFIG}
if config_filename == constants.BQ_LOAD_CONFIG_FILENAME:
return {"load": merged_config}
# retuning any other config file that doesn't have the same name as
# constants.BQ_LOAD_CONFIG_FILENAME (default load.json)
return merged_config