in src/columnar_storage/src/storage.rs [392:491]
fn test_storage_write_and_scan() {
let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", Int64));
let root_dir = temp_dir::TempDir::new().unwrap();
let store = Arc::new(LocalFileSystem::new());
let runtimes = build_runtimes();
runtimes.sst_compact_runtime.clone().block_on(async move {
let storage = ObjectBasedStorage::try_new(
root_dir.path().to_string_lossy().to_string(),
Duration::from_hours(2),
store,
schema.clone(),
2, // num_primary_keys
StorageConfig::default(),
runtimes,
)
.await
.unwrap();
let batch = record_batch!(
("pk1", UInt8, vec![11, 11, 9, 10, 5]),
("pk2", UInt8, vec![100, 100, 1, 2, 3]),
("value", Int64, vec![2, 7, 4, 6, 1])
)
.unwrap();
storage
.write(WriteRequest {
batch,
time_range: (1..10).into(),
enable_check: true,
})
.await
.unwrap();
let batch = record_batch!(
("pk1", UInt8, vec![11, 11, 9, 10]),
("pk2", UInt8, vec![100, 99, 1, 2]),
("value", Int64, vec![22, 77, 44, 66])
)
.unwrap();
storage
.write(WriteRequest {
batch,
time_range: (10..20).into(),
enable_check: true,
})
.await
.unwrap();
let result_stream = storage
.scan(ScanRequest {
range: TimeRange::new(Timestamp(0), Timestamp::MAX),
predicate: vec![],
projections: None,
})
.await
.unwrap();
let expected_batch = [
record_batch!(
("pk1", UInt8, vec![5, 9, 10, 11]),
("pk2", UInt8, vec![3, 1, 2, 99]),
("value", Int64, vec![1, 44, 66, 77])
)
.unwrap(),
record_batch!(
("pk1", UInt8, vec![11]),
("pk2", UInt8, vec![100]),
("value", Int64, vec![22])
)
.unwrap(),
];
check_stream(result_stream, expected_batch).await;
// test with predicate
let expr = col("pk1").eq(lit(11_u8));
let result_stream = storage
.scan(ScanRequest {
range: TimeRange::new(Timestamp(0), Timestamp::MAX),
predicate: vec![expr],
projections: None,
})
.await
.unwrap();
let expected_batch = [
record_batch!(
("pk1", UInt8, vec![11]),
("pk2", UInt8, vec![99]),
("value", Int64, vec![77])
)
.unwrap(),
record_batch!(
("pk1", UInt8, vec![11]),
("pk2", UInt8, vec![100]),
("value", Int64, vec![22])
)
.unwrap(),
];
check_stream(result_stream, expected_batch).await;
});
}