in src/marketing/src/CM360/src/raw/deploy_raw_layer.py [0:0]
def main():
"""Main function.
This function expects --pipeline-temp-bucket and --pipeline-staging-bucket
parameters for running the beam pipeline.
"""
parsed_args = _parse_args(sys.argv[1:])
logging.basicConfig(
level=logging.DEBUG if parsed_args.debug else logging.INFO)
logging.info("Parsed args %s", parsed_args)
logging.info("Deploying raw layer...")
logging.info(
"\n---------------------------------------\n"
"Using the following parameters from config:\n"
" RAW_PROJECT = %s \n"
" RAW_DATASET = %s \n"
"---------------------------------------\n", RAW_PROJECT, RAW_DATASET)
logging.info("Creating required directories for generated files...")
_create_output_dir_structure()
logging.info("Copying schema files...")
shutil.copytree(src=SCHEMA_DIR, dst=SCHEMAS_OUTPUT_DIR, dirs_exist_ok=True)
dag_start_date = datetime.now(timezone.utc).date()
client = cortex_bq_client.CortexBQClient(project=RAW_PROJECT)
if not "source_to_raw_tables" in SETTINGS:
logging.warning(
"❗ Property `source_to_raw_tables` missing in settings file. "
"Skipping cdc DAG generation.")
sys.exit()
raw_layer_settings = SETTINGS["source_to_raw_tables"]
logging.info("Processing RAW tables...")
for raw_table_settings in raw_layer_settings:
table_name = raw_table_settings["base_table"]
logging.info(" -- Processing table '%s' --", table_name)
table_mapping_path = Path(SCHEMA_DIR, f"{table_name}.csv")
full_table_name = f"{RAW_PROJECT}.{RAW_DATASET}.{table_name}"
if table_exists(bq_client=client, full_table_name=full_table_name):
logging.warning("❗ Table already exists.")
else:
logging.info("Creating raw table...")
logging.info("Creating schema...")
schema = create_bq_schema(table_mapping_path)
logging.debug("Raw table schema: %s\n", repr_schema(schema))
partition_details = raw_table_settings.get("partition_details")
cluster_details = raw_table_settings.get("cluster_details")
create_table_from_schema(bq_client=client,
full_table_name=full_table_name,
schema=schema,
partition_details=partition_details,
cluster_details=cluster_details)
logging.info("Table is created successfully.")
# DAG PY file generation
logging.info("Generating DAG python file...")
load_frequency = raw_table_settings["load_frequency"]
file_pattern = raw_table_settings["file_pattern"]
pipeline_temp_location = parsed_args.pipeline_temp_bucket
pipeline_staging_location = parsed_args.pipeline_staging_bucket
subs = {
"project_id":
RAW_PROJECT,
"raw_dataset":
RAW_DATASET,
"table_name":
table_name,
"load_frequency":
load_frequency,
"file_pattern":
file_pattern,
"start_date":
dag_start_date,
"datatransfer_bucket":
DATATRANSFER_BUCKET,
"schema_file":
table_mapping_path.name,
"pipeline_temp_location":
pipeline_temp_location,
"pipeline_staging_location":
pipeline_staging_location,
"project_region":
PROJECT_REGION,
"schemas_dir":
SCHEMAS_OUTPUT_DIR.stem,
"pipeline_file":
str(
Path(DEPENDENCIES_OUTPUT_DIR.stem,
"cm360_source_to_raw_pipeline.py")),
"pipeline_setup":
str(Path(DEPENDENCIES_OUTPUT_DIR.stem, "setup.py")),
}
generate_dag_from_template(
template_file=_DAG_TEMPLATE_PATH,
generation_target_directory=OUTPUT_DIR_FOR_RAW,
project_id=RAW_PROJECT,
dataset_id=RAW_DATASET,
table_name=table_name,
layer="extract_to_raw",
subs=subs)
logging.info("Generated dag python file.")
logging.info("Table processed successfully.")
logging.info("-----------------------------")
logging.info("Processed all tables successfully.")
logging.info("Copying dependencies...")
shutil.copytree(src=DEPENDENCIES_INPUT_DIR,
dst=DEPENDENCIES_OUTPUT_DIR,
dirs_exist_ok=True)
logging.info("Copied dependencies files successfully.")
logging.info("✅ Raw layer deployed successfully!")