src/translation/dags/translation_utils/input_validation_utils.py (88 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.exceptions import AirflowFailException
from google.api_core.exceptions import NotFound
from google.cloud import secretmanager_v1 as secretmanager
from google.cloud import storage
from common_utils import constants
def check_gcs_bucket_exists(gs_path: str) -> bool:
bucket_name, _ = split_gcs_path(gs_path)
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
if bucket.exists():
return True
else:
return False
def check_gcs_file_exists(gs_path: str) -> bool:
bucket_name, file_path = split_gcs_path(gs_path)
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
return blob.exists()
def check_gcs_directory_not_empty(gs_path: str) -> bool:
bucket_name, directory_path = split_gcs_path(gs_path)
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
if not bucket.exists():
return False
blobs = bucket.list_blobs(prefix=directory_path)
for _ in blobs:
return True
return False
def check_secret_access(project_id: str, secret_name: str) -> bool:
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{project_id}/secrets/{secret_name}/versions/latest"
try:
client.access_secret_version(request={"name": f"{name}"})
return True
except NotFound:
return False
except Exception as e:
raise e
def split_gcs_path(gcs_path):
# Splits a GCS path into bucket name and directory path.
# Input: "gs://mybucket/dir1/dir2/file1.csv"
# Output: "mybucket", "dir1/dir2/file1.csv"
path_parts = gcs_path.removeprefix("gs://").split("/", 1)
bucket_name = path_parts[0]
directory_path = path_parts[1] if len(path_parts) > 1 else ""
return bucket_name, directory_path
def normalize_and_validate_config(project_id: str, config: dict) -> dict:
# Normalize source name
if "source" in config:
config["source"] = config["source"].lower()
if "migrationTask" in config:
# Normalize GCS paths in the config by removing trailing slashes.
translationConfigDetails = config["migrationTask"]["translationConfigDetails"]
translationConfigDetails["gcsSourcePath"] = translationConfigDetails[
"gcsSourcePath"
].rstrip("/")
translationConfigDetails["gcsTargetPath"] = translationConfigDetails[
"gcsTargetPath"
].rstrip("/")
# Check that translation input directory is not empty.
if not check_gcs_directory_not_empty(translationConfigDetails["gcsSourcePath"]):
raise AirflowFailException(
f'No translation input files found at gcsSourcePath={translationConfigDetails["gcsSourcePath"]}.'
)
# Check that translation output bucket exists.
if not check_gcs_bucket_exists(translationConfigDetails["gcsTargetPath"]):
raise AirflowFailException(
f'Translation output bucket does not exist at gcsTargetPath={translationConfigDetails["gcsTargetPath"]}.'
)
if "validation_config" in config:
# Note: DVT source names are case-sensitive at present, so it's not easy to normalize them without maintaining a copy of their source name mapping here.
validation_params_file = config["validation_config"][
"validation_params_file_path"
]
# Check that validation params file exists.
if not check_gcs_file_exists(validation_params_file):
raise AirflowFailException(
f"Validation config parameters file not found at validation_params_file_path={validation_params_file}."
)
# Check that secrets exist in Secret Manager.
if "password" in config["validation_config"]["source_config"] and config[
"validation_config"
]["source_config"]["password"].startswith(constants.SECRET_PREFIX):
src_pw_secret = config["validation_config"]["source_config"]["password"]
if not check_secret_access(project_id, src_pw_secret):
raise AirflowFailException(
f"Secret not found in Secret Manager with the name {src_pw_secret}."
)
if "password" in config["validation_config"]["target_config"] and config[
"validation_config"
]["target_config"]["password"].startswith(constants.SECRET_PREFIX):
tgt_pw_secret = config["validation_config"]["target_config"]["password"]
if not check_secret_access(project_id, tgt_pw_secret):
raise AirflowFailException(
f"Secret not found in Secret Manager with the name {tgt_pw_secret}."
)
return config