def main()

in src/marketing/src/Meta/src/raw/deploy_raw_layer.py [0:0]


def main():
    args = _parse_args(sys.argv[1:])
    logging.basicConfig(level=logging.DEBUG if args["debug"] else logging.INFO)

    logging.info("Deploying RAW layer...")
    logging.info(
        "\n---------------------------------------\n"
        "Using the following parameters from config:\n"
        "  RAW_PROJECT = %s \n"
        "  RAW_DATASET = %s \n"
        "---------------------------------------\n", RAW_PROJECT, RAW_DATASET)

    if not "source_to_raw_tables" in SETTINGS:
        logging.warning(
            "File '%s' is missing property `source_to_raw_tables`. "
            "Skipping RAW DAG generation.", SETTINGS_FILE.name)
        sys.exit()

    logging.info("Processing tables...")

    _create_output_dir_structure()

    logging.info("Copying schema files...")
    shutil.copytree(src=SCHEMA_DIR, dst=SCHEMAS_OUTPUT_DIR, dirs_exist_ok=True)
    logging.info("Copying request fields files...")
    shutil.copytree(src=REQUESTS_DIR,
                    dst=REQUESTS_OUTPUT_DIR,
                    dirs_exist_ok=True)

    bq_client = cortex_bq_client.CortexBQClient(project=RAW_PROJECT)

    raw_layer_settings = SETTINGS.get("source_to_raw_tables")
    for raw_table_settings in raw_layer_settings:

        logging.info("Checking settings...")

        missing_raw_setting_attr = []
        for attr in ("load_frequency", "base_table", "entity_type",
                     "object_endpoint"):
            if raw_table_settings.get(attr) is None or raw_table_settings.get(
                    attr) == "":
                missing_raw_setting_attr.append(attr)
        if missing_raw_setting_attr:
            raise ValueError(
                "Setting file is missing or has empty value for one or more "
                f"attributes: {missing_raw_setting_attr} ")

        all_entity_types = ["adaccount", "dimension", "fact"]
        entity_type = raw_table_settings.get("entity_type")
        if entity_type not in all_entity_types:
            raise ValueError(f"{entity_type} is not valid entity type. "
                             f"Possible values: {all_entity_types}.")

        load_frequency = raw_table_settings.get("load_frequency")
        table_name = raw_table_settings.get("base_table")
        object_endpoint = raw_table_settings.get("object_endpoint")
        object_id_column = raw_table_settings.get("object_id_column")
        breakdowns = raw_table_settings.get("breakdowns")
        action_breakdowns = raw_table_settings.get("action_breakdowns")
        partition_details = raw_table_settings.get("partition_details")
        cluster_details = raw_table_settings.get("cluster_details")

        logging.info("Processing table %s", table_name)

        table_mapping_path = Path(SCHEMA_DIR, f"{table_name}.csv")
        full_table_name = f"{RAW_PROJECT}.{RAW_DATASET}.{table_name}"

        if table_exists(bq_client=bq_client, full_table_name=full_table_name):
            logging.warning("❗ Table already exists.")
        else:
            logging.info("Creating table %s...", table_name)

            schema = read_bq_schema(mapping_file=table_mapping_path,
                                    schema_target_field=SCHEMA_TARGET_FIELD,
                                    system_fields=SYSTEM_FIELDS,
                                    schema_bq_datatype_field=None)

            create_table_from_schema(bq_client=bq_client,
                         full_table_name=full_table_name,
                         schema=schema,
                         partition_details=partition_details,
                         cluster_details=cluster_details)

            logging.info("Table %s processed successfully.", table_name)

        logging.info("Generating DAG Python file for %s", table_name)

        dag_start_date = datetime.now(timezone.utc).date()
        pipeline_setup_file = Path(DEPENDENCIES_OUTPUT_DIR.stem, "setup.py")

        subs = {
            "project_id": RAW_PROJECT,
            "dataset": RAW_DATASET,
            "table_name": table_name,
            "entity_type": entity_type,
            "load_frequency": load_frequency,
            "object_endpoint": object_endpoint,
            "object_id_column": object_id_column or "",
            "breakdowns": breakdowns or "",
            "action_breakdowns": action_breakdowns or "",
            "start_date": dag_start_date,
            "schemas_dir": SCHEMAS_OUTPUT_DIR.stem,
            "requests_dir": REQUESTS_OUTPUT_DIR.stem,
            "pipeline_staging_bucket": args["pipeline_staging_bucket"],
            "pipeline_temp_bucket": args["pipeline_temp_bucket"],
            "pipeline_setup": str(pipeline_setup_file),
            "project_region": PROJECT_REGION,
        }

        output_dag_py_filename = (
            f"{RAW_PROJECT}_{RAW_DATASET}"
            f"_extract_to_raw_{table_name.replace('.', '_')}.py")
        output_dag_py_path = Path(OUTPUT_DIR_FOR_RAW, output_dag_py_filename)
        generate_file_from_template(DAG_TEMPLATE_FILE, output_dag_py_path,
                                    **subs)

        logging.info("Generated DAG Python file.")

    logging.info("All tables processed successfully.")
    logging.info("Copying dependencies...")

    shutil.copytree(src=DEPENDENCIES_INPUT_DIR,
                    dst=DEPENDENCIES_OUTPUT_DIR,
                    dirs_exist_ok=True)

    logging.info("Dependencies copied successfully.")
    logging.info("✅ RAW layer deployed successfully!")