in src/raw_dag_generator/generate_dags.py [0:0]
def main():
logging.basicConfig(level=logging.INFO)
logging.info("Generating raw dags...")
# Lets load configs to get various parameters needed for the dag generation.
config_dict = load_config_file(_CONFIG_FILE)
logging.info(
"\n---------------------------------------\n"
"Using the following config:\n %s"
"\n---------------------------------------\n",
json.dumps(config_dict, indent=4))
raw_project = config_dict.get("projectIdSource")
raw_dataset = config_dict.get("SFDC").get("datasets").get("raw")
location = config_dict.get("location", "US")
logging.info(
"\n---------------------------------------\n"
"Using the following parameters from config:\n"
" raw_project = %s \n"
" raw_dataset = %s \n"
" location = %s \n"
"---------------------------------------\n", raw_project, raw_dataset,
location)
Path(_GENERATED_DAG_DIR).mkdir(exist_ok=True, parents=True)
Path(_DEPENDENCIES_OUTPUT_DIR).mkdir(exist_ok=True, parents=True)
# Process tables based on configs from settings file
logging.info("Reading configs...")
if not Path(_SETTINGS_FILE).is_file():
logging.warning(
"File '%s' does not exist. Skipping Raw DAG generation.",
_SETTINGS_FILE)
sys.exit()
with open(_SETTINGS_FILE, encoding="utf-8") as settings_file:
configs = yaml.load(settings_file, Loader=yaml.SafeLoader)
if not configs:
logging.warning("File '%s' is empty. Skipping Raw DAG generation.",
_SETTINGS_FILE)
sys.exit()
if not "salesforce_to_raw_tables" in configs:
logging.warning(
"File '%s' is missing property `salesforce_to_raw_tables`. "
"Skipping Raw DAG generation.", _SETTINGS_FILE)
sys.exit()
logging.info("Processing tables...")
bq_client = cortex_bq_client.CortexBQClient()
table_configs = configs["salesforce_to_raw_tables"]
for table_config in table_configs:
process_table(bq_client, table_config, raw_dataset, raw_project)
# Copy Dependencies for the DAG Python files too
logging.info("Copying dependencies...")
shutil.copytree(src=_DEPENDENCIES_INPUT_DIR,
dst=_DEPENDENCIES_OUTPUT_DIR,
dirs_exist_ok=True)
logging.info("Done generating raw dags.")