in native/core/src/execution/planner.rs [2831:2953]
fn test_create_array() {
let session_ctx = SessionContext::new();
session_ctx.register_udf(ScalarUDF::from(
datafusion_functions_nested::make_array::MakeArray::new(),
));
let task_ctx = session_ctx.task_ctx();
let planner = PhysicalPlanner::new(Arc::from(session_ctx));
// Create a plan for
// ProjectionExec: expr=[make_array(col_0@0) as col_0]
// ScanExec: source=[CometScan parquet (unknown)], schema=[col_0: Int32]
let op_scan = Operator {
plan_id: 0,
children: vec![],
op_struct: Some(OpStruct::Scan(spark_operator::Scan {
fields: vec![
spark_expression::DataType {
type_id: 3, // Int32
type_info: None,
},
spark_expression::DataType {
type_id: 3, // Int32
type_info: None,
},
spark_expression::DataType {
type_id: 3, // Int32
type_info: None,
},
],
source: "".to_string(),
})),
};
let array_col = spark_expression::Expr {
expr_struct: Some(Bound(spark_expression::BoundReference {
index: 0,
datatype: Some(spark_expression::DataType {
type_id: 3,
type_info: None,
}),
})),
};
let array_col_1 = spark_expression::Expr {
expr_struct: Some(Bound(spark_expression::BoundReference {
index: 1,
datatype: Some(spark_expression::DataType {
type_id: 3,
type_info: None,
}),
})),
};
let projection = Operator {
children: vec![op_scan],
plan_id: 0,
op_struct: Some(OpStruct::Projection(spark_operator::Projection {
project_list: vec![spark_expression::Expr {
expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc {
func: "make_array".to_string(),
args: vec![array_col, array_col_1],
return_type: None,
})),
}],
})),
};
let a = Int32Array::from(vec![0, 3]);
let b = Int32Array::from(vec![1, 4]);
let c = Int32Array::from(vec![2, 5]);
let input_batch = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 2);
let (mut scans, datafusion_plan) =
planner.create_plan(&projection, &mut vec![], 1).unwrap();
scans[0].set_input_batch(input_batch);
let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
let (tx, mut rx) = mpsc::channel(1);
// Separate thread to send the EOF signal once we've processed the only input batch
runtime.spawn(async move {
let a = Int32Array::from(vec![0, 3]);
let b = Int32Array::from(vec![1, 4]);
let c = Int32Array::from(vec![2, 5]);
let input_batch1 = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 2);
let input_batch2 = InputBatch::EOF;
let batches = vec![input_batch1, input_batch2];
for batch in batches.into_iter() {
tx.send(batch).await.unwrap();
}
});
runtime.block_on(async move {
loop {
let batch = rx.recv().await.unwrap();
scans[0].set_input_batch(batch);
match poll!(stream.next()) {
Poll::Ready(Some(batch)) => {
assert!(batch.is_ok(), "got error {}", batch.unwrap_err());
let batch = batch.unwrap();
assert_eq!(batch.num_rows(), 2);
let expected = [
"+--------+",
"| col_0 |",
"+--------+",
"| [0, 1] |",
"| [3, 4] |",
"+--------+",
];
assert_batches_eq!(expected, &[batch]);
}
Poll::Ready(None) => {
break;
}
_ => {}
}
}
});
}