def main()

in src/marketing/src/GoogleAds/src/raw/deploy_raw_layer.py [0:0]


def main():
    pipeline_arguments = _parse_args()
    pipeline_temp_location, pipeline_staging_location,\
        debug_arg = pipeline_arguments

    level = logging.DEBUG if debug_arg else logging.INFO
    logging.getLogger().setLevel(level)

    logging.info("Deploying raw layer...")
    logging.info(
        "\n---------------------------------------\n"
        "Using the following parameters from config:\n"
        "  RAW_PROJECT = %s\n"
        "  RAW_DATASET = %s\n"
        "---------------------------------------\n", RAW_PROJECT, RAW_DATASET)

    logging.info("Creating required directories for generated files...")
    _create_output_dir_structure()

    logging.info("Copying schema files...")
    shutil.copytree(src=SCHEMA_DIR, dst=SCHEMAS_OUTPUT_DIR, dirs_exist_ok=True)

    dag_start_date = datetime.now(timezone.utc).date()
    client = cortex_bq_client.CortexBQClient(project=RAW_PROJECT)

    if not "source_to_raw_tables" in SETTINGS:
        logging.warning(
            "❗ property `source_to_raw_tables` missing in settings file. "
            "Skipping raw DAG generation.")
        sys.exit()

    raw_layer_settings = SETTINGS.get("source_to_raw_tables")

    logging.info("Processing raw tables...")
    for raw_table_settings in raw_layer_settings:

        # Making sure all required setting attributes are provided.
        missing_raw_setting_attr = []
        for attr in ("load_frequency", "key", "resource_type", "api_name",
                     "schema_file", "table_name"):
            if raw_table_settings.get(attr) is None or raw_table_settings.get(
                    attr) == "":
                missing_raw_setting_attr.append(attr)
        if missing_raw_setting_attr:
            raise ValueError(
                "Setting file is missing or has empty value for one or more "
                f"attributes: {missing_raw_setting_attr} ")

        load_frequency = raw_table_settings.get("load_frequency")
        key_fields = raw_table_settings.get("key").split(",")
        resource_type = raw_table_settings.get("resource_type")
        api_name = raw_table_settings.get("api_name")
        schema_file = raw_table_settings.get("schema_file")
        table_name = raw_table_settings.get("table_name")
        partition_details = raw_table_settings.get("partition_details")
        cluster_details = raw_table_settings.get("cluster_details")

        logging.info("-- Processing table '%s' --", table_name)

        table_mapping_path = Path(SCHEMA_DIR, schema_file)

        logging.info("Creating raw table...")
        full_table_name = RAW_PROJECT + "." + RAW_DATASET + "." + table_name
        # Check if raw table exists.
        raw_table_exists = table_exists(client, full_table_name)
        if not raw_table_exists:
            logging.info("Creating schema...")
            table_schema = create_bq_schema(table_mapping_path)
            logging.debug("Raw table schema: %s\n", repr_schema(table_schema))
            create_table_from_schema(bq_client=client,
                         schema=table_schema,
                         full_table_name=full_table_name,
                         partition_details=partition_details,
                         cluster_details=cluster_details)
            logging.info("Table is created successfully.")
        else:
            logging.warning("❗ Table already exists. Not creating table.")

        # DAG PY file generation
        logging.info("Generating DAG python file...")
        subs = {
            "project_id":
                RAW_PROJECT,
            "api_name":
                api_name,
            "raw_dataset":
                RAW_DATASET,
            "table_name":
                table_name,
            "load_frequency":
                load_frequency,
            "resource_type":
                resource_type,
            "start_date":
                dag_start_date,
            "schema_file":
                table_mapping_path.name,
            "pipeline_temp_location":
                pipeline_temp_location,
            "pipeline_staging_location":
                pipeline_staging_location,
            "project_region":
                PROJECT_REGION,
            "schemas_dir":
                SCHEMAS_OUTPUT_DIR.stem,
            "pipeline_file":
                str(
                    Path(DEPENDENCIES_OUTPUT_DIR.stem,
                         "ads_source_to_raw_pipeline.py")),
            "pipeline_setup":
                str(Path(DEPENDENCIES_OUTPUT_DIR.stem, "setup.py")),
        }
        _generate_dag_from_template(
            template_file=DAG_TEMPLATE_FILE,
            generation_target_directory=OUTPUT_DIR_FOR_RAW,
            project_id=RAW_PROJECT,
            dataset_id=RAW_DATASET,
            table_name=table_name,
            subs=subs)
        logging.info("Generated dag python file.")

        # Creating view that transforms raw data into format that's usable
        # in CDC layer according to schema defined in settings.
        #
        # View creation is performed in two scenarios:
        # a. If raw table is created above.
        # b. If testData flag is true, then raw table is already created by test
        #    harness. We still need to create the views.
        if POPULATE_TEST_DATA or not raw_table_exists:
            logging.info("Creating raw view...")
            try:
                create_view(client=client,
                            table_mapping_path=table_mapping_path,
                            table_name=table_name,
                            raw_project=RAW_PROJECT,
                            raw_dataset=RAW_DATASET,
                            key_fields=key_fields)
            except Exception as e:
                logging.error("Failed to create raw view '%s'.\n"
                              "ERROR: %s", table_name, str(e))
                raise SystemExit(
                    "⛔️ Failed to deploy raw views. Please check the logs."
                ) from e
            logging.info("View created successfully.")

        logging.info("Table processed successfully.")
        logging.info("-----------------------------")

    logging.info("Processed all tables successfully.")

    logging.info("Copying dependencies...")
    shutil.copytree(src=DEPENDENCIES_INPUT_DIR,
                    dst=DEPENDENCIES_OUTPUT_DIR,
                    dirs_exist_ok=True)
    logging.info("Copied dependencies files successfully.")

    logging.info("✅ Raw layer deployed successfully!")