in native/core/src/execution/planner.rs [2599:2682]
fn test_unpack_dictionary_string() {
let op_scan = Operator {
plan_id: 0,
children: vec![],
op_struct: Some(OpStruct::Scan(spark_operator::Scan {
fields: vec![spark_expression::DataType {
type_id: STRING_TYPE_ID, // String
type_info: None,
}],
source: "".to_string(),
})),
};
let lit = spark_expression::Literal {
value: Some(literal::Value::StringVal("foo".to_string())),
datatype: Some(spark_expression::DataType {
type_id: STRING_TYPE_ID,
type_info: None,
}),
is_null: false,
};
let op = create_filter_literal(op_scan, STRING_TYPE_ID, lit);
let planner = PhysicalPlanner::default();
let row_count = 100;
let keys = Int32Array::new((0..(row_count as i32)).map(|n| n % 4).collect(), None);
let values = StringArray::from(vec!["foo", "bar", "hello", "comet"]);
let input_array = DictionaryArray::new(keys, Arc::new(values));
let input_batch = InputBatch::Batch(vec![Arc::new(input_array)], row_count);
let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap();
// Scan's schema is determined by the input batch, so we need to set it before execution.
scans[0].set_input_batch(input_batch);
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
let (tx, mut rx) = mpsc::channel(1);
// Separate thread to send the EOF signal once we've processed the only input batch
runtime.spawn(async move {
// Create a dictionary array with 100 values, and use it as input to the execution.
let keys = Int32Array::new((0..(row_count as i32)).map(|n| n % 4).collect(), None);
let values = StringArray::from(vec!["foo", "bar", "hello", "comet"]);
let input_array = DictionaryArray::new(keys, Arc::new(values));
let input_batch1 = InputBatch::Batch(vec![Arc::new(input_array)], row_count);
let input_batch2 = InputBatch::EOF;
let batches = vec![input_batch1, input_batch2];
for batch in batches.into_iter() {
tx.send(batch).await.unwrap();
}
});
runtime.block_on(async move {
loop {
let batch = rx.recv().await.unwrap();
scans[0].set_input_batch(batch);
match poll!(stream.next()) {
Poll::Ready(Some(batch)) => {
assert!(batch.is_ok(), "got error {}", batch.unwrap_err());
let batch = batch.unwrap();
assert_eq!(batch.num_rows(), row_count / 4);
// string/binary should still be packed with dictionary
assert!(matches!(
batch.column(0).data_type(),
DataType::Dictionary(_, _)
));
}
Poll::Ready(None) => {
break;
}
_ => {}
}
}
});
}