in src/columnar_storage/src/storage.rs [494:536]
fn test_storage_sort_batch() {
let schema = arrow_schema!(("a", UInt8), ("b", UInt8), ("c", UInt8), ("c", UInt8));
let root_dir = temp_dir::TempDir::new().unwrap();
let store = Arc::new(LocalFileSystem::new());
let runtimes = build_runtimes();
runtimes.sst_compact_runtime.clone().block_on(async move {
let storage = ObjectBasedStorage::try_new(
root_dir.path().to_string_lossy().to_string(),
Duration::from_hours(2),
store,
schema.clone(),
1,
StorageConfig::default(),
runtimes,
)
.await
.unwrap();
let batch = record_batch!(
("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]),
("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]),
("c", UInt8, vec![8, 6, 2, 4, 3, 1, 5, 7]),
("d", UInt8, vec![2, 7, 4, 6, 1, 3, 5, 8])
)
.unwrap();
let mut sorted_batches = storage.sort_batch(batch).await.unwrap();
let expected_bacth = record_batch!(
("a", UInt8, vec![1, 2, 3, 4, 5, 6, 7, 8]),
("b", UInt8, vec![3, 1, 4, 8, 5, 6, 7, 2]),
("c", UInt8, vec![6, 8, 2, 4, 5, 1, 7, 3]),
("d", UInt8, vec![7, 2, 4, 6, 5, 3, 8, 1])
)
.unwrap();
let mut offset = 0;
while let Some(sorted_batch) = sorted_batches.next().await {
let sorted_batch = sorted_batch.unwrap();
let length = sorted_batch.num_rows();
let batch = expected_bacth.slice(offset, length);
assert_eq!(sorted_batch, batch);
offset += length;
}
});
}