in src/columnar_storage/src/compaction/executor.rs [93:114]
fn pre_check(&self, task: &Task) -> Result<()> {
assert!(!task.inputs.is_empty());
for f in &task.inputs {
assert!(f.is_compaction());
}
for f in &task.expireds {
assert!(f.is_compaction());
}
let task_size = task.input_size();
let inused = self.inner.inused_memory.load(Ordering::Relaxed);
let mem_limit = self.inner.mem_limit;
ensure!(
inused + task_size <= mem_limit,
"Compaction memory usage too high, inused:{inused}, task_size:{task_size}, limit:{mem_limit}"
);
self.inner
.inused_memory
.fetch_add(task.input_size(), Ordering::Relaxed);
Ok(())
}