in src/marketing/src/config_validator.py [0:0]
def validate(cfg: dict) -> Union[dict, None]:
"""Validates and processes configuration.
Args:
cfg (dict): Config dictionary.
Returns:
dict: Processed config dictionary.
"""
failed = False
if not cfg.get("deployMarketing", False):
logging.info("'marketing' is not being deployed. Skipping validation.")
return cfg
logging.info("Validating 'marketing' configuration...")
marketing = cfg.get("marketing")
if not marketing:
logging.error("🛑 Missing 'marketing' values in the config file. 🛑")
return None
# Marketing Attributes
missing_marketing_attr = []
for attr in ("deployGoogleAds", "deployCM360", "deployTikTok",
"deployLiveRamp", "deployMeta", "deploySFMC", "deployDV360",
"deployGA4"):
if marketing.get(attr) is None or marketing.get(attr) == "":
missing_marketing_attr.append(attr)
if missing_marketing_attr:
logging.warning(
"⚠️ Config file is missing some Marketing attributes or "
"has empty value: %s. ⚠️", missing_marketing_attr)
# Google Ads
deploy_googleads = marketing.get("deployGoogleAds")
if deploy_googleads:
googleads = marketing.get("GoogleAds")
if not googleads:
logging.error("🛑 Missing 'marketing' 'GoogleAds' attribute "
"in the config file. 🛑")
failed = True
else:
try:
_validate_googleads(cfg)
except Exception: # pylint: disable=broad-except
logging.exception("🛑 GoogleAds config validation failed. 🛑")
failed = True
# CM360
deploy_cm360 = marketing.get("deployCM360")
if deploy_cm360:
cm360 = marketing.get("CM360")
if not cm360:
logging.error("🛑 Missing 'marketing' 'CM360' attribute "
"in the config file. 🛑")
failed = True
else:
try:
_validate_cm360(cfg)
except Exception: # pylint: disable=broad-except
logging.exception("🛑 CM360 config validation failed. 🛑")
failed = True
# TikTok
deploy_tiktok = marketing.get("deployTikTok")
if deploy_tiktok:
tiktok = marketing.get("TikTok")
if not tiktok:
logging.error("🛑 Missing 'marketing' 'TikTok' attribute "
"in the config file. 🛑")
failed = True
else:
try:
_validate_tiktok(cfg)
except Exception: # pylint: disable=broad-except
logging.exception("🛑 TikTok config validation failed. 🛑")
failed = True
# LiveRamp
deploy_liveramp = marketing.get("deployLiveRamp")
if deploy_liveramp:
liveramp = marketing.get("LiveRamp")
if not liveramp:
logging.error("🛑 Missing 'marketing' 'LiveRamp' attribute "
"in the config file. 🛑")
failed = True
else:
try:
_validate_liveramp(cfg)
except Exception: # pylint: disable=broad-except
logging.exception("🛑 LiveRamp config validation failed. 🛑")
failed = True
# Meta
deploy_meta = marketing.get("deployMeta")
if deploy_meta:
meta = marketing.get("Meta")
if not meta:
logging.error("🛑 Missing 'marketing' 'Meta' attribute "
"in the config file. 🛑")
failed = True
else:
try:
_validate_meta(cfg)
except Exception: # pylint: disable=broad-except
logging.exception("🛑 Meta config validation failed. 🛑")
failed = True
# SFMC
deploy_sfmc = marketing.get("deploySFMC")
if deploy_sfmc:
sfmc = marketing.get("SFMC")
if not sfmc:
logging.error("🛑 Missing 'marketing' 'SFMC' attribute "
"in the config file. 🛑")
failed = True
else:
try:
_validate_sfmc(cfg)
except Exception: # pylint: disable=broad-except
logging.exception("🛑 SFMC config validation failed. 🛑")
failed = True
# DV360
deploy_dv360 = marketing.get("deployDV360")
if deploy_dv360:
dv360 = marketing.get("DV360")
if not dv360:
logging.error("🛑 Missing 'marketing' 'DV360' attribute "
"in the config file. 🛑")
failed = True
else:
try:
_validate_dv360(cfg)
except Exception: # pylint: disable=broad-except
logging.exception("🛑 DV360 config validation failed. 🛑")
failed = True
# Google Analytics 4
deploy_ga4 = marketing.get("deployGA4")
if deploy_ga4:
ga4 = marketing.get("GA4")
if not ga4:
logging.error("🛑 Missing 'marketing' 'GA4' attribute "
"in the config file. 🛑")
failed = True
else:
try:
_validate_ga4(cfg)
except Exception: # pylint: disable=broad-except
logging.exception(
"🛑 Google Analytics 4 config validation failed. 🛑")
failed = True
# dataflowRegion is only required for certain data sources.
if (deploy_googleads or deploy_cm360 or deploy_liveramp or deploy_meta or
deploy_tiktok or deploy_sfmc):
if not marketing.get("dataflowRegion"):
logging.error("🛑 Config file is missing or has empty value for "
"required attribute: dataflowRegion. 🛑")
failed = True
else:
region = marketing["dataflowRegion"].lower()
location = cfg["location"].lower()
if region != location and not region.startswith(f"{location}-"):
logging.error(
"🛑 Invalid `dataflowRegion`: `%s`. "
"It's expected to be in `%s`. 🛑",
marketing["dataflowRegion"], cfg["location"])
failed = True
if failed:
logging.error("🛑 'marketing' config is invalid. 🛑")
return None
else:
logging.info(
"✅ 'marketing' config validated successfully. Looks good. ✅")
return cfg