in src/translation/dags/translation_utils/input_validation_utils.py [0:0]
def normalize_and_validate_config(project_id: str, config: dict) -> dict:
# Normalize source name
if "source" in config:
config["source"] = config["source"].lower()
if "migrationTask" in config:
# Normalize GCS paths in the config by removing trailing slashes.
translationConfigDetails = config["migrationTask"]["translationConfigDetails"]
translationConfigDetails["gcsSourcePath"] = translationConfigDetails[
"gcsSourcePath"
].rstrip("/")
translationConfigDetails["gcsTargetPath"] = translationConfigDetails[
"gcsTargetPath"
].rstrip("/")
# Check that translation input directory is not empty.
if not check_gcs_directory_not_empty(translationConfigDetails["gcsSourcePath"]):
raise AirflowFailException(
f'No translation input files found at gcsSourcePath={translationConfigDetails["gcsSourcePath"]}.'
)
# Check that translation output bucket exists.
if not check_gcs_bucket_exists(translationConfigDetails["gcsTargetPath"]):
raise AirflowFailException(
f'Translation output bucket does not exist at gcsTargetPath={translationConfigDetails["gcsTargetPath"]}.'
)
if "validation_config" in config:
# Note: DVT source names are case-sensitive at present, so it's not easy to normalize them without maintaining a copy of their source name mapping here.
validation_params_file = config["validation_config"][
"validation_params_file_path"
]
# Check that validation params file exists.
if not check_gcs_file_exists(validation_params_file):
raise AirflowFailException(
f"Validation config parameters file not found at validation_params_file_path={validation_params_file}."
)
# Check that secrets exist in Secret Manager.
if "password" in config["validation_config"]["source_config"] and config[
"validation_config"
]["source_config"]["password"].startswith(constants.SECRET_PREFIX):
src_pw_secret = config["validation_config"]["source_config"]["password"]
if not check_secret_access(project_id, src_pw_secret):
raise AirflowFailException(
f"Secret not found in Secret Manager with the name {src_pw_secret}."
)
if "password" in config["validation_config"]["target_config"] and config[
"validation_config"
]["target_config"]["password"].startswith(constants.SECRET_PREFIX):
tgt_pw_secret = config["validation_config"]["target_config"]["password"]
if not check_secret_access(project_id, tgt_pw_secret):
raise AirflowFailException(
f"Secret not found in Secret Manager with the name {tgt_pw_secret}."
)
return config