in crates/core/src/merge/record_merger.rs [203:239]
fn get_sorted_rows(batch: &RecordBatch) -> Vec<(String, String, i32, i32)> {
let seqno = batch
.get_string_array(MetaField::CommitSeqno.as_ref())
.unwrap();
let keys = batch
.get_string_array(MetaField::RecordKey.as_ref())
.unwrap();
let timestamps = batch.get_array("ts").unwrap();
let timestamps = timestamps
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.clone();
let values = batch.get_array("value").unwrap();
let values = values
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.clone();
let mut result: Vec<(String, String, i32, i32)> = seqno
.iter()
.zip(keys.iter())
.zip(timestamps.iter())
.zip(values.iter())
.map(|(((s, k), t), v)| {
(
s.unwrap().to_string(),
k.unwrap().to_string(),
t.unwrap(),
v.unwrap(),
)
})
.collect();
result.sort_unstable_by_key(|(s, k, ts, _)| (k.clone(), *ts, s.clone()));
result
}