def _determine_next_dag()

in src/translation/dags/batch_sql_translation.py [0:0]


def _determine_next_dag(ti):
    config = ti.xcom_pull(key="config", task_ids="create_translation_workflow")
    run_type = (
        config[TRANSLATION_CONFIG_KEY]
        if TRANSLATION_CONFIG_KEY in config
        else TRANSLATION_CONFIG_DEFAULT_VALUE
    )
    if run_type == TRANSLATION_CONFIG_DDL:
        return "invoke_schema_dag"
    elif run_type == TRANSLATION_CONFIG_SQL:
        validation_mode = config["validation_config"].get("validation_mode")
        validation_dag_id = get_validation_dag_id(validation_mode)
        if validation_dag_id == VALIDATION_DAG_ID:
            return "invoke_validation_dag"
        else:
            return "invoke_validation_crun_dag"
    elif run_type == TRANSLATION_CONFIG_DML:
        return "invoke_dml_validation_dag"
    else:
        raise ValueError(f"invalid value for translation field: {run_type}")