in src/marketing/src/GoogleAds/src/raw/deploy_raw_layer.py [0:0]
def main():
pipeline_arguments = _parse_args()
pipeline_temp_location, pipeline_staging_location,\
debug_arg = pipeline_arguments
level = logging.DEBUG if debug_arg else logging.INFO
logging.getLogger().setLevel(level)
logging.info("Deploying raw layer...")
logging.info(
"\n---------------------------------------\n"
"Using the following parameters from config:\n"
" RAW_PROJECT = %s\n"
" RAW_DATASET = %s\n"
"---------------------------------------\n", RAW_PROJECT, RAW_DATASET)
logging.info("Creating required directories for generated files...")
_create_output_dir_structure()
logging.info("Copying schema files...")
shutil.copytree(src=SCHEMA_DIR, dst=SCHEMAS_OUTPUT_DIR, dirs_exist_ok=True)
dag_start_date = datetime.now(timezone.utc).date()
client = cortex_bq_client.CortexBQClient(project=RAW_PROJECT)
if not "source_to_raw_tables" in SETTINGS:
logging.warning(
"❗ property `source_to_raw_tables` missing in settings file. "
"Skipping raw DAG generation.")
sys.exit()
raw_layer_settings = SETTINGS.get("source_to_raw_tables")
logging.info("Processing raw tables...")
for raw_table_settings in raw_layer_settings:
# Making sure all required setting attributes are provided.
missing_raw_setting_attr = []
for attr in ("load_frequency", "key", "resource_type", "api_name",
"schema_file", "table_name"):
if raw_table_settings.get(attr) is None or raw_table_settings.get(
attr) == "":
missing_raw_setting_attr.append(attr)
if missing_raw_setting_attr:
raise ValueError(
"Setting file is missing or has empty value for one or more "
f"attributes: {missing_raw_setting_attr} ")
load_frequency = raw_table_settings.get("load_frequency")
key_fields = raw_table_settings.get("key").split(",")
resource_type = raw_table_settings.get("resource_type")
api_name = raw_table_settings.get("api_name")
schema_file = raw_table_settings.get("schema_file")
table_name = raw_table_settings.get("table_name")
partition_details = raw_table_settings.get("partition_details")
cluster_details = raw_table_settings.get("cluster_details")
logging.info("-- Processing table '%s' --", table_name)
table_mapping_path = Path(SCHEMA_DIR, schema_file)
logging.info("Creating raw table...")
full_table_name = RAW_PROJECT + "." + RAW_DATASET + "." + table_name
# Check if raw table exists.
raw_table_exists = table_exists(client, full_table_name)
if not raw_table_exists:
logging.info("Creating schema...")
table_schema = create_bq_schema(table_mapping_path)
logging.debug("Raw table schema: %s\n", repr_schema(table_schema))
create_table_from_schema(bq_client=client,
schema=table_schema,
full_table_name=full_table_name,
partition_details=partition_details,
cluster_details=cluster_details)
logging.info("Table is created successfully.")
else:
logging.warning("❗ Table already exists. Not creating table.")
# DAG PY file generation
logging.info("Generating DAG python file...")
subs = {
"project_id":
RAW_PROJECT,
"api_name":
api_name,
"raw_dataset":
RAW_DATASET,
"table_name":
table_name,
"load_frequency":
load_frequency,
"resource_type":
resource_type,
"start_date":
dag_start_date,
"schema_file":
table_mapping_path.name,
"pipeline_temp_location":
pipeline_temp_location,
"pipeline_staging_location":
pipeline_staging_location,
"project_region":
PROJECT_REGION,
"schemas_dir":
SCHEMAS_OUTPUT_DIR.stem,
"pipeline_file":
str(
Path(DEPENDENCIES_OUTPUT_DIR.stem,
"ads_source_to_raw_pipeline.py")),
"pipeline_setup":
str(Path(DEPENDENCIES_OUTPUT_DIR.stem, "setup.py")),
}
_generate_dag_from_template(
template_file=DAG_TEMPLATE_FILE,
generation_target_directory=OUTPUT_DIR_FOR_RAW,
project_id=RAW_PROJECT,
dataset_id=RAW_DATASET,
table_name=table_name,
subs=subs)
logging.info("Generated dag python file.")
# Creating view that transforms raw data into format that's usable
# in CDC layer according to schema defined in settings.
#
# View creation is performed in two scenarios:
# a. If raw table is created above.
# b. If testData flag is true, then raw table is already created by test
# harness. We still need to create the views.
if POPULATE_TEST_DATA or not raw_table_exists:
logging.info("Creating raw view...")
try:
create_view(client=client,
table_mapping_path=table_mapping_path,
table_name=table_name,
raw_project=RAW_PROJECT,
raw_dataset=RAW_DATASET,
key_fields=key_fields)
except Exception as e:
logging.error("Failed to create raw view '%s'.\n"
"ERROR: %s", table_name, str(e))
raise SystemExit(
"⛔️ Failed to deploy raw views. Please check the logs."
) from e
logging.info("View created successfully.")
logging.info("Table processed successfully.")
logging.info("-----------------------------")
logging.info("Processed all tables successfully.")
logging.info("Copying dependencies...")
shutil.copytree(src=DEPENDENCIES_INPUT_DIR,
dst=DEPENDENCIES_OUTPUT_DIR,
dirs_exist_ok=True)
logging.info("Copied dependencies files successfully.")
logging.info("✅ Raw layer deployed successfully!")