tools/scd-bq-to-bq/main.py (44 lines of code) (raw):
from gcs_ops import export_bigquery_to_gcs
import settings # Imports from scd_project/config/settings.py
import scd_operations
import uuid
# Generate unique batch id for each execution
batch_id = uuid.uuid4()
print("bacth_id=" + str(batch_id))
# Name of the Google Cloud Storage bucket for storing intermediate and output data.
gcs_bucket_name = settings.GCS_BUCKET_NAME
gcs_input_path = f"gs://{gcs_bucket_name}/scd/{batch_id}/input.csv"
gcs_update_delete_output_path = (
f"gs://{gcs_bucket_name}/scd/{batch_id}/output_update_delete.csv"
)
gcs_insert_output_path = f"gs://{gcs_bucket_name}/scd/{batch_id}/output_insert_scd.csv"
# Call the function with your variables
export_bigquery_to_gcs(
settings.PROJECT_ID, settings.BQ_INPUT_TABLE_NAME, gcs_input_path
)
def run_scd_processing():
"""
Main function to run the SCD generation processes.
"""
print("Starting SCD Update/Delete Generation Process...")
upd_del_status = scd_operations.scd_update_delete_generation(
gcs_input_path=gcs_input_path,
gcs_output_path=gcs_update_delete_output_path,
primary_key_column=settings.PRIMARY_KEY_COLUMN,
scd_column_list=settings.SCD_COLUMN_LIST,
effective_from_date_column=settings.EFFECTIVE_FROM_DATE_COLUMN,
effective_to_date_column=settings.EFFECTIVE_TO_DATE_COLUMN,
active_flag_column=settings.ACTIVE_FLAG_COLUMN,
unique_scd_keys_for_generation=settings.UNIQUE_SCD_KEYS_FOR_GENERATION,
percentage_for_update_scd_generation=settings.PERCENTAGE_FOR_UPDATE_SCD_GENERATION,
)
print(f"SCD Update/Delete Generation Status: {upd_del_status}\n")
print("Starting SCD Insert Generation Process...")
inst_status = scd_operations.scd_insert_generation(
gcs_input_path=gcs_input_path, # Source for sampling new records
gcs_output_path=gcs_insert_output_path,
primary_key_column=settings.PRIMARY_KEY_COLUMN,
scd_column_list=settings.SCD_COLUMN_LIST,
effective_from_date_column=settings.EFFECTIVE_FROM_DATE_COLUMN,
effective_to_date_column=settings.EFFECTIVE_TO_DATE_COLUMN,
active_flag_column=settings.ACTIVE_FLAG_COLUMN,
unique_scd_keys_for_generation=settings.UNIQUE_SCD_KEYS_FOR_GENERATION,
number_of_insert_record_count=settings.NUMBER_OF_INSERT_RECORD_COUNT,
)
print(f"SCD Insert Generation Status: {inst_status}\n")
if __name__ == "__main__":
run_scd_processing()