in src/marketing/src/Meta/src/raw/deploy_raw_layer.py [0:0]
def main():
args = _parse_args(sys.argv[1:])
logging.basicConfig(level=logging.DEBUG if args["debug"] else logging.INFO)
logging.info("Deploying RAW layer...")
logging.info(
"\n---------------------------------------\n"
"Using the following parameters from config:\n"
" RAW_PROJECT = %s \n"
" RAW_DATASET = %s \n"
"---------------------------------------\n", RAW_PROJECT, RAW_DATASET)
if not "source_to_raw_tables" in SETTINGS:
logging.warning(
"File '%s' is missing property `source_to_raw_tables`. "
"Skipping RAW DAG generation.", SETTINGS_FILE.name)
sys.exit()
logging.info("Processing tables...")
_create_output_dir_structure()
logging.info("Copying schema files...")
shutil.copytree(src=SCHEMA_DIR, dst=SCHEMAS_OUTPUT_DIR, dirs_exist_ok=True)
logging.info("Copying request fields files...")
shutil.copytree(src=REQUESTS_DIR,
dst=REQUESTS_OUTPUT_DIR,
dirs_exist_ok=True)
bq_client = cortex_bq_client.CortexBQClient(project=RAW_PROJECT)
raw_layer_settings = SETTINGS.get("source_to_raw_tables")
for raw_table_settings in raw_layer_settings:
logging.info("Checking settings...")
missing_raw_setting_attr = []
for attr in ("load_frequency", "base_table", "entity_type",
"object_endpoint"):
if raw_table_settings.get(attr) is None or raw_table_settings.get(
attr) == "":
missing_raw_setting_attr.append(attr)
if missing_raw_setting_attr:
raise ValueError(
"Setting file is missing or has empty value for one or more "
f"attributes: {missing_raw_setting_attr} ")
all_entity_types = ["adaccount", "dimension", "fact"]
entity_type = raw_table_settings.get("entity_type")
if entity_type not in all_entity_types:
raise ValueError(f"{entity_type} is not valid entity type. "
f"Possible values: {all_entity_types}.")
load_frequency = raw_table_settings.get("load_frequency")
table_name = raw_table_settings.get("base_table")
object_endpoint = raw_table_settings.get("object_endpoint")
object_id_column = raw_table_settings.get("object_id_column")
breakdowns = raw_table_settings.get("breakdowns")
action_breakdowns = raw_table_settings.get("action_breakdowns")
partition_details = raw_table_settings.get("partition_details")
cluster_details = raw_table_settings.get("cluster_details")
logging.info("Processing table %s", table_name)
table_mapping_path = Path(SCHEMA_DIR, f"{table_name}.csv")
full_table_name = f"{RAW_PROJECT}.{RAW_DATASET}.{table_name}"
if table_exists(bq_client=bq_client, full_table_name=full_table_name):
logging.warning("❗ Table already exists.")
else:
logging.info("Creating table %s...", table_name)
schema = read_bq_schema(mapping_file=table_mapping_path,
schema_target_field=SCHEMA_TARGET_FIELD,
system_fields=SYSTEM_FIELDS,
schema_bq_datatype_field=None)
create_table_from_schema(bq_client=bq_client,
full_table_name=full_table_name,
schema=schema,
partition_details=partition_details,
cluster_details=cluster_details)
logging.info("Table %s processed successfully.", table_name)
logging.info("Generating DAG Python file for %s", table_name)
dag_start_date = datetime.now(timezone.utc).date()
pipeline_setup_file = Path(DEPENDENCIES_OUTPUT_DIR.stem, "setup.py")
subs = {
"project_id": RAW_PROJECT,
"dataset": RAW_DATASET,
"table_name": table_name,
"entity_type": entity_type,
"load_frequency": load_frequency,
"object_endpoint": object_endpoint,
"object_id_column": object_id_column or "",
"breakdowns": breakdowns or "",
"action_breakdowns": action_breakdowns or "",
"start_date": dag_start_date,
"schemas_dir": SCHEMAS_OUTPUT_DIR.stem,
"requests_dir": REQUESTS_OUTPUT_DIR.stem,
"pipeline_staging_bucket": args["pipeline_staging_bucket"],
"pipeline_temp_bucket": args["pipeline_temp_bucket"],
"pipeline_setup": str(pipeline_setup_file),
"project_region": PROJECT_REGION,
}
output_dag_py_filename = (
f"{RAW_PROJECT}_{RAW_DATASET}"
f"_extract_to_raw_{table_name.replace('.', '_')}.py")
output_dag_py_path = Path(OUTPUT_DIR_FOR_RAW, output_dag_py_filename)
generate_file_from_template(DAG_TEMPLATE_FILE, output_dag_py_path,
**subs)
logging.info("Generated DAG Python file.")
logging.info("All tables processed successfully.")
logging.info("Copying dependencies...")
shutil.copytree(src=DEPENDENCIES_INPUT_DIR,
dst=DEPENDENCIES_OUTPUT_DIR,
dirs_exist_ok=True)
logging.info("Dependencies copied successfully.")
logging.info("✅ RAW layer deployed successfully!")