tools/scd-file-to-file/scd_operations.py (203 lines of code) (raw):

from google.cloud import storage import pandas as pd import re import uuid def scd_update_delete_generation( gcs_input_path, gcs_output_path, primary_key_column, scd_column_list, effective_from_date_column, effective_to_date_column, active_flag_column, unique_scd_keys_for_generation, percentage_for_update_scd_generation, ) -> str: """Generates Slowly Changing Dimension (SCD) Type 2 data for updates and deletes. This function reads data from a CSV file in Google Cloud Storage (GCS), identifies records for updates and deletes based on the provided parameters, and writes the generated SCD data to a new CSV file in GCS. Args: gcs_input_path (str): The GCS path to the input CSV file. gcs_output_path (str): The GCS path to the output CSV file. primary_key_column (str): The name of the primary key column. scd_column_list (str): A comma-separated list of columns to consider for SCD changes. effective_from_date_column (str): The name of the column for the effective from date. effective_to_date_column (str): The name of the column for the effective to date. active_flag_column (str): The name of the column indicating active records. unique_scd_keys_for_generation (int): The number of unique keys to use for SCD generation. percentage_for_update_scd_generation (float): The percentage of records to use for updates. Returns: Return the status of the SCD Generation as Succeeded or Failed. If Failed, it returns along with the error message """ try: storage_client = storage.Client() df = pd.read_csv( gcs_input_path, storage_options={"client": storage_client}, header=0 ) scd_staging_temp = df[ df[active_flag_column] == True ].copy() # Create a copy to avoid SettingWithCopyWarning scd_staging_temp = scd_staging_temp.sample(frac=1).reset_index(drop=True) split_index = int(len(scd_staging_temp) * percentage_for_update_scd_generation) scd_staging = scd_staging_temp[:split_index].copy() scd_staging_delete = scd_staging_temp[split_index:].copy() scd_staging_delete["action_flag"] = "D" # Assigning Id's scd_staging["id"] = scd_staging.reset_index().index + 1 scd_staging["processed_flag"] = False scd_staging["action_flag"] = "U" # Get total records total_records = len(scd_staging) print( f"Update SCD Count Generation={total_records}, Unique SCD Keys for Generation={unique_scd_keys_for_generation}" ) columns_to_process = scd_column_list.split(",") for current_column in columns_to_process: # Get unique values for SCD generation scd_array = ( df[current_column] .drop_duplicates() .head(unique_scd_keys_for_generation) .values ) if scd_array.size == 0: # Handle potential empty array scd_array = [None] # Use a list with None if the array is empty array_len = len(scd_array) print("For the Column:", current_column) print("Array Length:", array_len) # Print the array length for debugging batch_size = max( int(total_records / array_len), 1 ) # Ensure batch_size is at least 1 print("Batch Size:", batch_size) # Print the batch size for debugging i = 1 array_index = 0 unporcessed_record_count = len( scd_staging[scd_staging["processed_flag"] == False] ) while ( unporcessed_record_count > 0 ): # Loop while there are unprocessed records j = i array_value = scd_array[array_index % array_len] k = min(i + batch_size - 1, total_records) mask = (scd_staging[current_column] != array_value) & ( scd_staging["processed_flag"] == False ) if i <= total_records: # Only apply id filter if within range mask = mask & (scd_staging["id"] >= j) & (scd_staging["id"] <= k) scd_staging.loc[mask, current_column] = array_value scd_staging.loc[mask, effective_to_date_column] = pd.Timestamp.now() scd_staging.loc[mask, "processed_flag"] = True unporcessed_record_count = len( scd_staging[scd_staging["processed_flag"] == False] ) i += batch_size array_index += 1 scd_staging["processed_flag"] = ( False # Reset processed flag for next column ) p_status = "SCD Update Delete Generation succeeded" # scd_staging = scd_staging.sort_values(by=[primary_key_column]) scd_staging = scd_staging.drop( columns=[ "id", "effective_from_date", "effective_to_date", "active_flag", "processed_flag", ] ) print("Records for SCD Update Logic Check") # print(scd_staging) # Print the sorted DataFrame print("Records for SCD Delete Logic Check") scd_staging_delete = scd_staging_delete.drop( columns=["effective_from_date", "effective_to_date", "active_flag"] ) # scd_staging_delete = scd_staging_delete.sort_values(by=[primary_key_column]) # print(scd_staging_delete) union_scd = pd.concat([scd_staging, scd_staging_delete], ignore_index=True) union_scd = union_scd.sort_values(by=[primary_key_column]) print(union_scd.to_string(index=False)) # Extract the bucket name from the output path gcs_bucket = re.match(r"gs://([^/]+)/", gcs_output_path).group(1) # Extract the file name from the output path gcs_file_path = re.match(r"gs://([^/]+)/(.*)", gcs_output_path).group(2) # Get the bucket and blob (file) objects bucket = storage_client.bucket(gcs_bucket) blob = bucket.blob(gcs_file_path) # Upload the DataFrame to GCS blob.upload_from_string(union_scd.to_csv(index=False), content_type="text/csv") print(f"DataFrame uploaded to: gs://{gcs_bucket}/{gcs_file_path}") except Exception as e: p_status = f"SCD Update Delete Generation Failed: {e}" # Include the actual error message print(p_status) def scd_insert_generation( gcs_input_path, gcs_output_path, primary_key_column, scd_column_list, effective_from_date_column, effective_to_date_column, active_flag_column, unique_scd_keys_for_generation, number_of_insert_record_count, ) -> str: """Generates data for inserting new records into a Slowly Changing Dimension (SCD) Type 2 table. This function reads data from a CSV file in Google Cloud Storage (GCS), generates new records with unique primary keys and modified SCD columns, and writes the generated data to a new CSV file in GCS. Args: gcs_input_path (str): The GCS path to the input CSV file. gcs_output_path (str): The GCS path to the output CSV file. primary_key_column (str): The name of the primary key column. scd_column_list (str): A comma-separated list of columns to consider for SCD changes. effective_from_date_column (str): The name of the column for the effective from date. effective_to_date_column (str): The name of the column for the effective to date. active_flag_column (str): The name of the column indicating active records. unique_scd_keys_for_generation (int): The number of unique keys to use for SCD generation. number_of_insert_record_count (int): The number of insert records to generate. Returns: Return the status of the SCD Generation as Succeeded or Failed. If Failed, it returns along with the error message """ try: storage_client = storage.Client() df = pd.read_csv( gcs_input_path, storage_options={"client": storage_client}, header=0 ) total_records = len(df) print( f"Insert SCD Count Generation={total_records}, Unique SCD Keys for Generation={unique_scd_keys_for_generation}" ) scd_staging = df.sample(frac=1).reset_index(drop=True).copy() scd_staging["id"] = scd_staging.reset_index().index + 1 scd_staging["processed_flag"] = False scd_staging["action_flag"] = "I" columns_to_process = scd_column_list.split(",") for current_column in columns_to_process: # Get unique values for SCD generation scd_array = ( df[current_column] .drop_duplicates() .head(unique_scd_keys_for_generation) .values ) if scd_array.size == 0: # Handle potential empty array scd_array = [None] # Use a list with None if the array is empty array_len = len(scd_array) print("For the Column:", current_column) print("Array Length:", array_len) # Print the array length for debugging batch_size = max( int(total_records / array_len), 1 ) # Ensure batch_size is at least 1 print("Batch Size:", batch_size) # Print the batch size for debugging i = 1 array_index = 0 unporcessed_record_count = len( scd_staging[scd_staging["processed_flag"] == False] ) while ( unporcessed_record_count > 0 ): # Loop while there are unprocessed records j = i array_value = scd_array[array_index % array_len] k = min(i + batch_size - 1, total_records) mask = (scd_staging[current_column] != array_value) & ( scd_staging["processed_flag"] == False ) if i <= total_records: # Only apply id filter if within range mask = mask & (scd_staging["id"] >= j) & (scd_staging["id"] <= k) scd_staging.loc[mask, current_column] = array_value scd_staging.loc[mask, effective_to_date_column] = pd.Timestamp.now() scd_staging.loc[mask, "processed_flag"] = True unporcessed_record_count = len( scd_staging[scd_staging["processed_flag"] == False] ) i += batch_size array_index += 1 scd_staging["processed_flag"] = ( False # Reset processed flag for next column ) p_status = "SCD Insert Generation succeeded" # Sort by 'id' column scd_staging = scd_staging.sort_values(by=[primary_key_column]) scd_staging = scd_staging.drop( columns=[ "id", "effective_from_date", "effective_to_date", "active_flag", "processed_flag", ] ) scd_staging[primary_key_column] = scd_staging.apply( lambda _: uuid.uuid4().int, axis=1 ) print("Records for SCD Insert Logic Check") print(scd_staging.to_string(index=False)) # Extract the bucket name from the output path gcs_bucket = re.match(r"gs://([^/]+)/", gcs_output_path).group(1) # Extract the file name from the output path gcs_file_path = re.match(r"gs://([^/]+)/(.*)", gcs_output_path).group(2) # Get the bucket and blob (file) objects bucket = storage_client.bucket(gcs_bucket) blob = bucket.blob(gcs_file_path) # Upload the DataFrame to GCS blob.upload_from_string( scd_staging.to_csv(index=False), content_type="text/csv" ) print(f"DataFrame uploaded to: gs://{gcs_bucket}/{gcs_file_path}") except Exception as e: p_status = ( f"SCD Insert Generation Failed: {e}" # Include the actual error message ) print(p_status)