def _prepare_data_for_next_dag()

in src/translation/dags/controller_dag.py [0:0]


def _prepare_data_for_next_dag(ti, **kwargs):
    event_type = ti.xcom_pull(key="event_type", task_ids="load_config")
    next_dag_config = None
    if event_type == "OBJECT_FINALIZE":
        config = ti.xcom_pull(key="config", task_ids="load_config")
        bucket_id = ti.xcom_pull(key="bucket_id", task_ids="load_config")
        object_id = ti.xcom_pull(key="object_id", task_ids="load_config")
        op_type = config["type"]
        if op_type in ["ddl", "sql", "dml"]:
            data_source = config["source"]
            if data_source in ["teradata", "hive", "oracle", "redshift", "DB2"]:
                if "validation_only" in config and config["validation_only"] == "yes":
                    next_dag_config = config
                else:
                    next_dag_config = {"config": config}
            else:
                print(f"Unsupported data source : {data_source}")

        elif op_type == "data":
            if "validation_only" in config and config["validation_only"] == "yes":
                next_dag_config = config
            else:
                next_dag_config = {
                    "config": config,
                    "bucket_id": bucket_id,
                    "object_id": object_id,
                }
        else:
            print(f"Error: Unsupported operation type: {op_type}")

    elif event_type == "TRANSFER_RUN_FINISHED":
        config = ti.xcom_pull(key="config", task_ids="load_config")
        data_source = config["dataSourceId"]
        if data_source == "on_premises":
            next_dag_config = {
                "TRANSFER_RUN_ID": config["name"],
                "TRANSFER_RUN_STATE": config["state"],
                "SOURCE_SCHEMA": config["params"]["database_name"],
                "TABLE_NAME_PATTERN": config["params"]["table_name_patterns"],
            }
        elif data_source == "redshift":
            next_dag_config = {
                "TRANSFER_RUN_ID": config["name"],
                "TRANSFER_RUN_STATE": config["state"],
                "SOURCE_SCHEMA": config["params"]["redshift_schema"],
                "TABLE_NAME_PATTERN": config["params"]["table_name_patterns"],
            }
    else:
        print(f"Unsupported event type: {event_type}")

    ti.xcom_push(key="next_dag_config", value=next_dag_config)