in src/translation/dags/controller_dag.py [0:0]
def _determine_next_dag(ti, **kwargs):
event_type = ti.xcom_pull(key="event_type", task_ids="load_config")
next_dag_id = None
if event_type == "OBJECT_FINALIZE":
config = ti.xcom_pull(key="config", task_ids="load_config")
op_type = config["type"]
data_source = config["source"]
if op_type in ["ddl", "sql", "dml"]:
data_source = config["source"]
if data_source in ["teradata", "oracle", "redshift", "DB2"]:
if "validation_only" in config and config["validation_only"] == "yes":
next_dag_id = determine_validation_dag(config)
elif (
"extract_ddl" in config
and config["extract_ddl"] == "yes"
and op_type not in ["sql", "dml"]
):
# call DDL extractor DAG
next_dag_id = EXTRACT_DDL_DAG_ID
else:
# call batch_translator_dag
next_dag_id = BATCH_TRANSLATOR_DAG_ID
elif data_source == "hive":
# call DDL extractor DAG
next_dag_id = EXTRACT_DDL_DAG_ID
else:
print(f"Error: Unsupported data source: {data_source}")
elif op_type == "data":
if "validation_only" in config and config["validation_only"] == "yes":
next_dag_id = determine_validation_dag(config)
elif data_source == "teradata":
next_dag_id = DATA_LOAD_TERADATA_DAG_ID
elif data_source == "hive":
next_dag_id = DATA_LOAD_HIVE_DAG_ID
elif data_source == "hive_inc":
next_dag_id = DATA_LOAD_HIVE_INC_DAG_ID
elif data_source == "redshift":
next_dag_id = DATA_LOAD_REDSHIFT_DAG_ID
else:
print(f"Unsupported operation type: {op_type}")
elif event_type == "TRANSFER_RUN_FINISHED":
config = ti.xcom_pull(key="config", task_ids="load_config")
data_source = config["dataSourceId"]
if data_source == "on_premises":
next_dag_id = TERADATA_TRANSFER_RUN_LOG_DAG_ID
elif data_source == "redshift":
next_dag_id = REDSHIFT_TRANSFER_RUN_LOG_DAG_ID
else:
print(f"Unsupported event type: {event_type}")
if next_dag_id is None:
next_task = "end_task"
else:
next_task = "determine_next_dag_task"
ti.xcom_push(key="next_dag_id", value=next_dag_id)
return next_task