in src/translation/dags/controller_dag.py [0:0]
def _prepare_data_for_next_dag(ti, **kwargs):
event_type = ti.xcom_pull(key="event_type", task_ids="load_config")
next_dag_config = None
if event_type == "OBJECT_FINALIZE":
config = ti.xcom_pull(key="config", task_ids="load_config")
bucket_id = ti.xcom_pull(key="bucket_id", task_ids="load_config")
object_id = ti.xcom_pull(key="object_id", task_ids="load_config")
op_type = config["type"]
if op_type in ["ddl", "sql", "dml"]:
data_source = config["source"]
if data_source in ["teradata", "hive", "oracle", "redshift", "DB2"]:
if "validation_only" in config and config["validation_only"] == "yes":
next_dag_config = config
else:
next_dag_config = {"config": config}
else:
print(f"Unsupported data source : {data_source}")
elif op_type == "data":
if "validation_only" in config and config["validation_only"] == "yes":
next_dag_config = config
else:
next_dag_config = {
"config": config,
"bucket_id": bucket_id,
"object_id": object_id,
}
else:
print(f"Error: Unsupported operation type: {op_type}")
elif event_type == "TRANSFER_RUN_FINISHED":
config = ti.xcom_pull(key="config", task_ids="load_config")
data_source = config["dataSourceId"]
if data_source == "on_premises":
next_dag_config = {
"TRANSFER_RUN_ID": config["name"],
"TRANSFER_RUN_STATE": config["state"],
"SOURCE_SCHEMA": config["params"]["database_name"],
"TABLE_NAME_PATTERN": config["params"]["table_name_patterns"],
}
elif data_source == "redshift":
next_dag_config = {
"TRANSFER_RUN_ID": config["name"],
"TRANSFER_RUN_STATE": config["state"],
"SOURCE_SCHEMA": config["params"]["redshift_schema"],
"TABLE_NAME_PATTERN": config["params"]["table_name_patterns"],
}
else:
print(f"Unsupported event type: {event_type}")
ti.xcom_push(key="next_dag_config", value=next_dag_config)