in src/translation/dags/extract_ddl_dag.py [0:0]
def _determine_next_taskgroup_from_source(ti, **kwargs):
config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]
ti.xcom_push(key="config", value=config)
data_source = config["source"].lower()
logging.info(f"Group branching data_source: {data_source}")
if data_source == "teradata":
logging.info("Teradata")
return "teradata_extraction_taskgroup.check_teradata_jdbc_jar_present"
elif data_source == "hive":
logging.info("Hive")
return "hive_extraction_taskgroup.set_required_vars"
elif data_source == "redshift":
logging.info("Redshift")
return "redshift_extraction_taskgroup.extract_redshift_ddl"
elif data_source == "oracle":
logging.info("Oracle")
return "oracle_extraction_taskgroup.extract_ddl"
else:
logging.info(f"Error: Unsupported data source: {data_source}")
return "end_task"