in src/translation/dags/batch_sql_translation.py [0:0]
def _determine_next_dag(ti):
config = ti.xcom_pull(key="config", task_ids="create_translation_workflow")
run_type = (
config[TRANSLATION_CONFIG_KEY]
if TRANSLATION_CONFIG_KEY in config
else TRANSLATION_CONFIG_DEFAULT_VALUE
)
if run_type == TRANSLATION_CONFIG_DDL:
return "invoke_schema_dag"
elif run_type == TRANSLATION_CONFIG_SQL:
validation_mode = config["validation_config"].get("validation_mode")
validation_dag_id = get_validation_dag_id(validation_mode)
if validation_dag_id == VALIDATION_DAG_ID:
return "invoke_validation_dag"
else:
return "invoke_validation_crun_dag"
elif run_type == TRANSLATION_CONFIG_DML:
return "invoke_dml_validation_dag"
else:
raise ValueError(f"invalid value for translation field: {run_type}")