in src/SAP/SAP_REPORTING/common/py_libs/jinja.py [0:0]
def initialize_jinja_from_config(config_dict: dict) -> dict:
"""Generates jinja parameters dictionary from configuration."""
jinja_data_file_dict = {
"project_id_src": config_dict["projectIdSource"],
"project_id_tgt": config_dict["projectIdTarget"],
"location": config_dict["location"],
"k9_datasets_processing": config_dict["k9"]["datasets"]["processing"],
"k9_datasets_reporting": config_dict["k9"]["datasets"]["reporting"]
}
jinja_data_file_dict.update({"deploy_sap": config_dict.get("deploySAP")})
jinja_data_file_dict.update({"deploy_sfdc": config_dict.get("deploySFDC")})
jinja_data_file_dict.update(
{"deploy_oracleebs": config_dict.get("deployOracleEBS")})
# SFDC
if config_dict.get("deploySFDC"):
jinja_data_file_dict.update({
"sfdc_datasets_raw":
config_dict["SFDC"]["datasets"]["raw"],
"sfdc_datasets_cdc":
config_dict["SFDC"]["datasets"]["cdc"],
"sfdc_datasets_reporting":
config_dict["SFDC"]["datasets"]["reporting"],
"sfdc_currencies":
config_dict["SFDC"]["currencies"]
})
# SAP
# Flavor specific fields use `get()` because they do not exist in the raw
# config and accessing them directly causes the function to fail.
if config_dict.get("deploySAP"):
jinja_data_file_dict.update({
# raw datasets
"dataset_raw_landing":
config_dict["SAP"]["datasets"]["raw"],
"dataset_raw_landing_ecc":
config_dict["SAP"]["datasets"].get("rawECC"),
"dataset_raw_landing_s4":
config_dict["SAP"]["datasets"].get("rawS4"),
# cdc datasets
"dataset_cdc_processed":
config_dict["SAP"]["datasets"]["cdc"],
"dataset_cdc_processed_ecc":
config_dict["SAP"]["datasets"].get("cdcECC"),
"dataset_cdc_processed_s4":
config_dict["SAP"]["datasets"].get("cdcS4"),
# reporting datasets
"dataset_reporting_tgt":
config_dict["SAP"]["datasets"]["reporting"],
# mandt
"mandt":
config_dict["SAP"]["mandt"],
"mandt_ecc":
config_dict["SAP"].get("mandtECC"),
"mandt_s4":
config_dict["SAP"].get("mandtS4"),
# Misc
# We only use lowercase SQLFlavor in our templates
"sql_flavor":
config_dict["SAP"]["SQLFlavor"].lower(),
"sql_flavour":
config_dict["SAP"]["SQLFlavor"].lower(),
"sap_currencies":
config_dict["SAP"]["currencies"],
"sap_languages":
config_dict["SAP"]["languages"]
})
if config_dict.get("deployMarketing"):
jinja_data_file_dict.update({
"deploy_googleads": config_dict["marketing"].get("deployGoogleAds")
})
jinja_data_file_dict.update(
{"deploy_cm360": config_dict["marketing"].get("deployCM360")})
jinja_data_file_dict.update(
{"deploy_tiktok": config_dict["marketing"].get("deployTikTok")})
jinja_data_file_dict.update(
{"deploy_liveramp": config_dict["marketing"].get("deployLiveRamp")})
jinja_data_file_dict.update(
{"deploy_meta": config_dict["marketing"].get("deployMeta")})
jinja_data_file_dict.update(
{"deploy_sfmc": config_dict["marketing"].get("deploySFMC")})
jinja_data_file_dict.update(
{"deploy_dv360": config_dict["marketing"].get("deployDV360")})
jinja_data_file_dict.update(
{"deploy_ga4": config_dict["marketing"].get("deployGA4")})
# GoogleAds
if config_dict["marketing"].get("deployGoogleAds"):
jinja_data_file_dict.update({
"marketing_googleads_datasets_raw":
config_dict["marketing"]["GoogleAds"]["datasets"]["raw"],
"marketing_googleads_datasets_cdc":
config_dict["marketing"]["GoogleAds"]["datasets"]["cdc"],
"marketing_googleads_datasets_reporting":
config_dict["marketing"]["GoogleAds"]["datasets"]
["reporting"]
})
# CM360
if config_dict["marketing"].get("deployCM360"):
jinja_data_file_dict.update({
"marketing_cm360_datasets_raw":
config_dict["marketing"]["CM360"]["datasets"]["raw"],
"marketing_cm360_datasets_cdc":
config_dict["marketing"]["CM360"]["datasets"]["cdc"],
"marketing_cm360_datasets_reporting":
config_dict["marketing"]["CM360"]["datasets"]["reporting"]
})
# TikTok
if config_dict["marketing"].get("deployTikTok"):
jinja_data_file_dict.update({
"marketing_tiktok_datasets_raw":
config_dict["marketing"]["TikTok"]["datasets"]["raw"],
"marketing_tiktok_datasets_cdc":
config_dict["marketing"]["TikTok"]["datasets"]["cdc"],
"marketing_tiktok_datasets_reporting":
config_dict["marketing"]["TikTok"]["datasets"]["reporting"]
})
# LiveRamp
if config_dict["marketing"].get("deployLiveRamp"):
jinja_data_file_dict.update({
"marketing_liveramp_datasets_cdc":
config_dict["marketing"]["LiveRamp"]["datasets"]["cdc"]
})
# Meta
if config_dict["marketing"].get("deployMeta"):
jinja_data_file_dict.update({
"marketing_meta_datasets_raw":
config_dict["marketing"]["Meta"]["datasets"]["raw"],
"marketing_meta_datasets_cdc":
config_dict["marketing"]["Meta"]["datasets"]["cdc"],
"marketing_meta_datasets_reporting":
config_dict["marketing"]["Meta"]["datasets"]["reporting"]
})
# SFMC
if config_dict["marketing"].get("deploySFMC"):
jinja_data_file_dict.update({
"marketing_sfmc_datasets_raw":
config_dict["marketing"]["SFMC"]["datasets"]["raw"],
"marketing_sfmc_datasets_cdc":
config_dict["marketing"]["SFMC"]["datasets"]["cdc"],
"marketing_sfmc_datasets_reporting":
config_dict["marketing"]["SFMC"]["datasets"]["reporting"]
})
# DV360
if config_dict["marketing"].get("deployDV360"):
jinja_data_file_dict.update({
"marketing_dv360_datasets_raw":
config_dict["marketing"]["DV360"]["datasets"]["raw"],
"marketing_dv360_datasets_cdc":
config_dict["marketing"]["DV360"]["datasets"]["cdc"],
"marketing_dv360_datasets_reporting":
config_dict["marketing"]["DV360"]["datasets"]["reporting"]
})
# Google Analytics 4
if config_dict["marketing"].get("deployGA4"):
jinja_data_file_dict.update({
"marketing_ga4_datasets_cdc":
config_dict["marketing"]["GA4"]["datasets"]["cdc"],
"marketing_ga4_datasets_reporting":
config_dict["marketing"]["GA4"]["datasets"]["reporting"]
})
# Oracle EBS
if config_dict.get("deployOracleEBS"):
jinja_data_file_dict.update({
"oracle_ebs_datasets_cdc":
config_dict["OracleEBS"]["datasets"]["cdc"],
"oracle_ebs_datasets_reporting":
config_dict["OracleEBS"]["datasets"]["reporting"],
"oracle_ebs_item_category_set_ids":
config_dict["OracleEBS"]["itemCategorySetIds"],
"oracle_ebs_currency_conversion_type":
config_dict["OracleEBS"]["currencyConversionType"],
"oracle_ebs_currency_conversion_targets":
config_dict["OracleEBS"]["currencyConversionTargets"],
"oracle_ebs_languages":
config_dict["OracleEBS"]["languages"],
})
# Currency Conversion
if config_dict["k9"].get("deployCurrencyConversion"):
jinja_data_file_dict.update({
"k9_currency_conversion_data_source_type":
config_dict["k9"]["CurrencyConversion"]["dataSourceType"],
"k9_currency_conversion_rate_type":
config_dict["k9"]["CurrencyConversion"]["rateType"],
})
# ProductDim
if config_dict["k9"].get("deployProductDim"):
jinja_data_file_dict.update({
"k9_product_dim_data_source_type":
config_dict["k9"]["ProductDim"]["dataSourceType"],
"k9_product_dim_text_language":
config_dict["k9"]["ProductDim"]["textLanguage"],
})
# CrossMedia
if config_dict["k9"].get("deployCrossMedia"):
cm_config = config_dict["k9"]["CrossMedia"]
jinja_data_file_dict.update({
"k9_cross_media_target_currencies":
cm_config["targetCurrencies"],
"k9_cross_media_lookback_window_days":
cm_config["lookbackWindowDays"],
"k9_cross_media_additional_prompt":
cm_config["additionalPrompt"],
"k9_cross_media_product_hierarchy_type":
cm_config["productHierarchyType"],
"k9_cross_media_max_product_hierarchy_match_level":
cm_config["maxProductHierarchyMatchLevel"],
"k9_cross_media_text_generation_model":
cm_config.get("textGenerationModel",
constants.K9_CROSS_MEDIA_DEFAULT_TEXT_GENERATION_MODEL),
})
# Meridian
if config_dict["k9"].get("deployMeridian"):
cm_config = config_dict["k9"]["Meridian"]
jinja_data_file_dict.update({
"k9_meridian_sales_data_source_type":
cm_config["salesDataSourceType"]
})
jinja_data_file_dict.update({
"k9_meridian_sales_dataset_id":
cm_config["salesDatasetID"]
})
# Vertex AI
if config_dict["VertexAI"]:
jinja_data_file_dict.update({
"vertexai_region":
config_dict["VertexAI"]["region"],
"vertexai_datasets_processing":
config_dict["VertexAI"]["processingDataset"]
})
return jinja_data_file_dict