in common/rusty_leveldb_sgx/src/db_impl.rs [792:901]
fn do_compaction_work(&mut self, cs: &mut CompactionState) -> Result<()> {
{
let current = self.vset.borrow().current();
assert!(current.borrow().num_level_files(cs.compaction.level()) > 0);
assert!(cs.builder.is_none());
}
let start_ts = self.opt.env.micros();
log!(
self.opt.log,
"Compacting {} files at L{} and {} files at L{}",
cs.compaction.num_inputs(0),
cs.compaction.level(),
cs.compaction.num_inputs(1),
cs.compaction.level() + 1
);
let mut input = self.vset.borrow().make_input_iterator(&cs.compaction);
input.seek_to_first();
let (mut key, mut val) = (vec![], vec![]);
let mut last_seq_for_key = MAX_SEQUENCE_NUMBER;
let mut have_ukey = false;
let mut current_ukey = vec![];
while input.valid() {
// TODO: Do we need to do a memtable compaction here? Probably not, in the sequential
// case.
assert!(input.current(&mut key, &mut val));
if cs.compaction.should_stop_before(&key) && cs.builder.is_none() {
self.finish_compaction_output(cs, key.clone())?;
}
let (ktyp, seq, ukey) = parse_internal_key(&key);
if seq == 0 {
// Parsing failed.
log!(self.opt.log, "Encountered seq=0 in key: {:?}", &key);
last_seq_for_key = MAX_SEQUENCE_NUMBER;
have_ukey = false;
current_ukey.clear();
input.advance();
continue;
}
if !have_ukey || self.opt.cmp.cmp(ukey, ¤t_ukey) != Ordering::Equal {
// First occurrence of this key.
current_ukey.clear();
current_ukey.extend_from_slice(ukey);
have_ukey = true;
last_seq_for_key = MAX_SEQUENCE_NUMBER;
}
// We can omit the key under the following conditions:
if last_seq_for_key <= cs.smallest_seq {
last_seq_for_key = seq;
input.advance();
continue;
}
// Entry is deletion; no older version is observable by any snapshot; and all entries
// in compacted levels with smaller sequence numbers will
if ktyp == ValueType::TypeDeletion
&& seq <= cs.smallest_seq
&& cs.compaction.is_base_level_for(ukey)
{
last_seq_for_key = seq;
input.advance();
continue;
}
last_seq_for_key = seq;
if cs.builder.is_none() {
let fnum = self.vset.borrow_mut().new_file_number();
let mut fmd = FileMetaData::default();
fmd.num = fnum;
let fname = table_file_name(&self.path, fnum);
let f = self.opt.env.open_writable_file(Path::new(&fname))?;
let f = Box::new(BufWriter::new(f));
cs.builder = Some(TableBuilder::new(self.opt.clone(), f));
cs.outputs.push(fmd);
}
if cs.builder.as_ref().unwrap().entries() == 0 {
cs.current_output().smallest = key.clone();
}
cs.builder.as_mut().unwrap().add(&key, &val)?;
// NOTE: Adjust max file size based on level.
if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size {
self.finish_compaction_output(cs, key.clone())?;
}
input.advance();
}
if cs.builder.is_some() {
self.finish_compaction_output(cs, key)?;
}
let mut stats = CompactionStats::default();
stats.micros = self.opt.env.micros() - start_ts;
for parent in 0..2 {
for inp in 0..cs.compaction.num_inputs(parent) {
stats.read += cs.compaction.input(parent, inp).size;
}
}
for output in &cs.outputs {
stats.written += output.size;
}
self.cstats[cs.compaction.level()].add(stats);
Ok(())
}