def _determine_next_taskgroup_from_source()

in src/translation/dags/extract_ddl_dag.py [0:0]


def _determine_next_taskgroup_from_source(ti, **kwargs):
    config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]
    ti.xcom_push(key="config", value=config)

    data_source = config["source"].lower()

    logging.info(f"Group branching data_source: {data_source}")
    if data_source == "teradata":
        logging.info("Teradata")
        return "teradata_extraction_taskgroup.check_teradata_jdbc_jar_present"
    elif data_source == "hive":
        logging.info("Hive")
        return "hive_extraction_taskgroup.set_required_vars"
    elif data_source == "redshift":
        logging.info("Redshift")
        return "redshift_extraction_taskgroup.extract_redshift_ddl"
    elif data_source == "oracle":
        logging.info("Oracle")
        return "oracle_extraction_taskgroup.extract_ddl"
    else:
        logging.info(f"Error: Unsupported data source: {data_source}")
        return "end_task"