src/common_utils/storage_utils.py (111 lines of code) (raw):

import csv import json from pathlib import Path import pandas as pd from google.api_core.client_info import ClientInfo from google.cloud import storage from google.cloud.storage.blob import Blob from common_utils import custom_user_agent validation_csv_header_fields = [ "translation-type", "validation-type", "source-table", "target-table", "source-query-file", "target-query-file", "filter-status", "primary-keys", "filters", "exclusion-columns", "allow-list", "count", "sum", "min", "max", "avg", "grouped-columns", "wildcard-include-string-len", "cast-to-bigint", "threshold", "hash", "concat", "comparison-fields", "use-random-row", "random-row-batch-size", ] class StorageUtils: def __init__(self) -> None: self.client = storage.Client( client_info=ClientInfo(user_agent=custom_user_agent.USER_AGENT) ) def create_bucket_path_notification(self, path, blob_name, topic_name): bucket_id, path_prefix = self.parse_bucket_and_blob_from_path(path) bucket = self.client.bucket(bucket_id) notification = bucket.notification( topic_name=topic_name, blob_name_prefix=append_blob_name_to_path(path_prefix, blob_name), event_types=["OBJECT_FINALIZE"], ) notification.create() def read_object_from_gcsbucket(self, bucket_id, object_id): bucket = self.client.get_bucket(bucket_id) blob = storage.Blob(object_id, bucket) raw_config = blob.download_as_bytes() config = json.loads(raw_config) return config def write_object_in_gcsbucket( self, bucket_id: str, object_name: str, object_content: str ): bucket = self.client.bucket(bucket_id) blob = bucket.blob(object_name) blob.upload_from_string(object_content) gs_object_path = f"gs://{bucket_id}/{object_name}" return gs_object_path def parse_bucket_and_blob_from_path(self, path): blob = Blob.from_string(path, client=self.client) return blob.bucket.name, blob.name def check_object_exist_in_bucket(self, bucket_id: str, object_name: str): bucket = self.client.bucket(bucket_id) blob = bucket.blob(object_name) return blob.exists() def get_validation_params_from_gcs( self, bucket_id, object_name, translation_type, validation_type ): temp_file = "temporary_file.xlsx" dest_file = "validation_params.csv" bucket = self.client.get_bucket(bucket_id) blob = bucket.blob(object_name) file_extension = Path(object_name).suffix if file_extension == ".xlsx": blob.download_to_filename(temp_file) excel_file = pd.read_excel(temp_file) excel_file.to_csv(dest_file, index=None, header=True) else: blob.download_to_filename(dest_file) validation_params_key = ( "source-table" if translation_type in ["ddl", "data"] else "source-query-file" ) validation_type = ( f"custom query {validation_type}" if translation_type == "sql" else validation_type ) validation_params = {} with open(dest_file, encoding="utf-8") as csvf: csvReader = csv.DictReader(csvf, fieldnames=validation_csv_header_fields) next(csvReader) next(csvReader) # skip the first two lines from csv as headers for rows in csvReader: key = rows[validation_params_key] if ( translation_type == rows["translation-type"] and validation_type == rows["validation-type"] ): validation_params[key] = rows return validation_params def append_blob_name_to_path(path, blob_name): if path[-1] != "/": return f"{path}/{blob_name}" return f"{path}{blob_name}"