in src/config_validator.py [0:0]
def validate(cfg: dict) -> Union[dict, None]:
"""Validates and processes configuration.
It will discover and log all issues before returning.
Args:
cfg (dict): Config dictionary.
Returns:
dict: Processed config dictionary.
None: In case of validation failure
"""
failed = False
if not cfg.get("deploySFDC"):
logging.info("SFDC is not being deployed. Skipping validation.")
return cfg
logging.info("Validating SFDC configuration.")
sfdc = cfg.get("SFDC")
if not sfdc:
logging.error("🛑 Missing 'SFDC' values in the config file. 🛑")
return None
missing_sfdc_attrs = []
for attr in ("deployCDC", "currencies", "datasets"):
if sfdc.get(attr) is None or sfdc.get(attr) == "":
missing_sfdc_attrs.append(attr)
if missing_sfdc_attrs:
logging.error("🛑 Missing 'SFDC values: %s' in the config file. 🛑",
missing_sfdc_attrs)
failed = True
datasets = sfdc.get("datasets")
sfdc["createMappingViews"] = sfdc.get("createMappingViews", True)
cdc = datasets.get("cdc")
if not cdc:
logging.error("🛑 Missing 'SFDC/datasets/cdc' values "
"in the config file. 🛑")
failed = True
raw = datasets.get("raw")
if not raw:
logging.error("🛑 Missing 'SFDC/datasets/raw' values "
"in the config file. 🛑")
failed = True
reporting = datasets.get("reporting", "REPORTING_SFDC")
datasets["reporting"] = reporting
source = cfg["projectIdSource"]
target = cfg["projectIdTarget"]
location = cfg["location"]
datasets = [
resource_validation_helper.DatasetConstraints(f"{source}.{raw}", True,
True, location),
resource_validation_helper.DatasetConstraints(f"{source}.{cdc}", True,
True, location),
resource_validation_helper.DatasetConstraints(f"{target}.{reporting}",
False, True, location)
]
if not resource_validation_helper.validate_resources([], datasets):
logging.error("🛑 'SFDC' resource validation failed. 🛑")
failed = True
if failed:
logging.error("🛑 SFDC configuration is invalid. 🛑")
return None
else:
logging.info("✅ SFDC configuration is good. ✅")
return cfg