def _determine_next_dag()

in src/translation/dags/controller_dag.py [0:0]


def _determine_next_dag(ti, **kwargs):
    event_type = ti.xcom_pull(key="event_type", task_ids="load_config")
    next_dag_id = None
    if event_type == "OBJECT_FINALIZE":
        config = ti.xcom_pull(key="config", task_ids="load_config")
        op_type = config["type"]
        data_source = config["source"]
        if op_type in ["ddl", "sql", "dml"]:
            data_source = config["source"]
            if data_source in ["teradata", "oracle", "redshift", "DB2"]:
                if "validation_only" in config and config["validation_only"] == "yes":
                    next_dag_id = determine_validation_dag(config)
                elif (
                    "extract_ddl" in config
                    and config["extract_ddl"] == "yes"
                    and op_type not in ["sql", "dml"]
                ):
                    # call DDL extractor DAG
                    next_dag_id = EXTRACT_DDL_DAG_ID
                else:
                    # call batch_translator_dag
                    next_dag_id = BATCH_TRANSLATOR_DAG_ID
            elif data_source == "hive":
                # call DDL extractor DAG
                next_dag_id = EXTRACT_DDL_DAG_ID
            else:
                print(f"Error: Unsupported data source: {data_source}")

        elif op_type == "data":
            if "validation_only" in config and config["validation_only"] == "yes":
                next_dag_id = determine_validation_dag(config)
            elif data_source == "teradata":
                next_dag_id = DATA_LOAD_TERADATA_DAG_ID
            elif data_source == "hive":
                next_dag_id = DATA_LOAD_HIVE_DAG_ID
            elif data_source == "hive_inc":
                next_dag_id = DATA_LOAD_HIVE_INC_DAG_ID
            elif data_source == "redshift":
                next_dag_id = DATA_LOAD_REDSHIFT_DAG_ID
        else:
            print(f"Unsupported operation type: {op_type}")

    elif event_type == "TRANSFER_RUN_FINISHED":
        config = ti.xcom_pull(key="config", task_ids="load_config")
        data_source = config["dataSourceId"]
        if data_source == "on_premises":
            next_dag_id = TERADATA_TRANSFER_RUN_LOG_DAG_ID
        elif data_source == "redshift":
            next_dag_id = REDSHIFT_TRANSFER_RUN_LOG_DAG_ID
    else:
        print(f"Unsupported event type: {event_type}")

    if next_dag_id is None:
        next_task = "end_task"
    else:
        next_task = "determine_next_dag_task"

    ti.xcom_push(key="next_dag_id", value=next_dag_id)

    return next_task