in src/cdc_dag_generator/generate_views.py [0:0]
def main():
logging.basicConfig(level=logging.INFO)
logging.info("Generating CDC views...")
config_dict = load_config_file(_CONFIG_FILE)
logging.info(
"\n---------------------------------------\n"
"Using the following config:\n %s"
"\n---------------------------------------\n",
json.dumps(config_dict, indent=4))
# Read params from the config
project_id = config_dict.get("projectIdSource")
raw_dataset = config_dict.get("SFDC").get("datasets").get("raw")
cdc_dataset = config_dict.get("SFDC").get("datasets").get("cdc")
logging.info(
"\n---------------------------------------\n"
"Using the following parameters from config:\n"
" source_project_id = %s \n"
" raw_dataset = %s \n"
" cdc_dataset = %s \n"
"---------------------------------------\n", project_id, raw_dataset,
cdc_dataset)
Path(_GENERATED_VIEW_SQL_DIR).mkdir(exist_ok=True, parents=True)
# Process tables based on table settings from settings file
logging.info("Reading table settings...")
if not Path(_SETTINGS_FILE).is_file():
logging.warning(
"File '%s' does not exist. Skipping CDC view generation.",
_SETTINGS_FILE)
sys.exit()
with open(_SETTINGS_FILE, encoding="utf-8") as settings_file:
settings = yaml.load(settings_file, Loader=yaml.SafeLoader)
if not settings:
logging.warning("File '%s' is empty. Skipping CDC view generation.",
_SETTINGS_FILE)
sys.exit()
# TODO: Check settings file schema.
if not "raw_to_cdc_tables" in settings:
logging.warning(
"File '%s' is missing property `raw_to_cdc_tables`. "
"Skipping CDC view generation.", _SETTINGS_FILE)
sys.exit()
table_settings = settings["raw_to_cdc_tables"]
bq_client = cortex_bq_client.CortexBQClient()
logging.info("Processing tables...")
for table_setting in table_settings:
process_table(bq_client, table_setting, project_id, raw_dataset,
cdc_dataset)
logging.info("Done generating CDC views.")