fn bench_write()

in src/server/src/main.rs [187:233]


fn bench_write(
    storage: ColumnarStorageRef,
    rt: RuntimeRef,
    workers: usize,
    interval: Duration,
    keep_writing: Arc<AtomicBool>,
) {
    let schema = Arc::new(Schema::new(vec![
        Field::new("pk1", DataType::Int64, true),
        Field::new("pk2", DataType::Int64, true),
        Field::new("pk3", DataType::Int64, true),
        Field::new("value", DataType::Int64, true),
    ]));
    for _ in 0..workers {
        let storage = storage.clone();
        let schema = schema.clone();
        let keep_writing = keep_writing.clone();
        rt.spawn(async move {
            loop {
                tokio::time::sleep(interval).await;
                if !keep_writing.load(Ordering::Relaxed) {
                    continue;
                }
                let pk1: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
                let pk2: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
                let pk3: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
                let value: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
                let batch = RecordBatch::try_new(
                    schema.clone(),
                    vec![Arc::new(pk1), Arc::new(pk2), Arc::new(pk3), Arc::new(value)],
                )
                .unwrap();
                let now = common::now();
                if let Err(e) = storage
                    .write(WriteRequest {
                        batch,
                        enable_check: false,
                        time_range: (now..now + 1).into(),
                    })
                    .await
                {
                    error!("write failed, err:{}", e);
                }
            }
        });
    }
}