in src/server/src/main.rs [187:233]
fn bench_write(
storage: ColumnarStorageRef,
rt: RuntimeRef,
workers: usize,
interval: Duration,
keep_writing: Arc<AtomicBool>,
) {
let schema = Arc::new(Schema::new(vec![
Field::new("pk1", DataType::Int64, true),
Field::new("pk2", DataType::Int64, true),
Field::new("pk3", DataType::Int64, true),
Field::new("value", DataType::Int64, true),
]));
for _ in 0..workers {
let storage = storage.clone();
let schema = schema.clone();
let keep_writing = keep_writing.clone();
rt.spawn(async move {
loop {
tokio::time::sleep(interval).await;
if !keep_writing.load(Ordering::Relaxed) {
continue;
}
let pk1: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let pk2: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let pk3: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let value: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(pk1), Arc::new(pk2), Arc::new(pk3), Arc::new(value)],
)
.unwrap();
let now = common::now();
if let Err(e) = storage
.write(WriteRequest {
batch,
enable_check: false,
time_range: (now..now + 1).into(),
})
.await
{
error!("write failed, err:{}", e);
}
}
});
}
}