fn do_compaction_work()

in common/rusty_leveldb_sgx/src/db_impl.rs [792:901]


    fn do_compaction_work(&mut self, cs: &mut CompactionState) -> Result<()> {
        {
            let current = self.vset.borrow().current();
            assert!(current.borrow().num_level_files(cs.compaction.level()) > 0);
            assert!(cs.builder.is_none());
        }
        let start_ts = self.opt.env.micros();
        log!(
            self.opt.log,
            "Compacting {} files at L{} and {} files at L{}",
            cs.compaction.num_inputs(0),
            cs.compaction.level(),
            cs.compaction.num_inputs(1),
            cs.compaction.level() + 1
        );

        let mut input = self.vset.borrow().make_input_iterator(&cs.compaction);
        input.seek_to_first();

        let (mut key, mut val) = (vec![], vec![]);
        let mut last_seq_for_key = MAX_SEQUENCE_NUMBER;

        let mut have_ukey = false;
        let mut current_ukey = vec![];

        while input.valid() {
            // TODO: Do we need to do a memtable compaction here? Probably not, in the sequential
            // case.
            assert!(input.current(&mut key, &mut val));
            if cs.compaction.should_stop_before(&key) && cs.builder.is_none() {
                self.finish_compaction_output(cs, key.clone())?;
            }
            let (ktyp, seq, ukey) = parse_internal_key(&key);
            if seq == 0 {
                // Parsing failed.
                log!(self.opt.log, "Encountered seq=0 in key: {:?}", &key);
                last_seq_for_key = MAX_SEQUENCE_NUMBER;
                have_ukey = false;
                current_ukey.clear();
                input.advance();
                continue;
            }

            if !have_ukey || self.opt.cmp.cmp(ukey, &current_ukey) != Ordering::Equal {
                // First occurrence of this key.
                current_ukey.clear();
                current_ukey.extend_from_slice(ukey);
                have_ukey = true;
                last_seq_for_key = MAX_SEQUENCE_NUMBER;
            }

            // We can omit the key under the following conditions:
            if last_seq_for_key <= cs.smallest_seq {
                last_seq_for_key = seq;
                input.advance();
                continue;
            }
            // Entry is deletion; no older version is observable by any snapshot; and all entries
            // in compacted levels with smaller sequence numbers will
            if ktyp == ValueType::TypeDeletion
                && seq <= cs.smallest_seq
                && cs.compaction.is_base_level_for(ukey)
            {
                last_seq_for_key = seq;
                input.advance();
                continue;
            }

            last_seq_for_key = seq;

            if cs.builder.is_none() {
                let fnum = self.vset.borrow_mut().new_file_number();
                let mut fmd = FileMetaData::default();
                fmd.num = fnum;

                let fname = table_file_name(&self.path, fnum);
                let f = self.opt.env.open_writable_file(Path::new(&fname))?;
                let f = Box::new(BufWriter::new(f));
                cs.builder = Some(TableBuilder::new(self.opt.clone(), f));
                cs.outputs.push(fmd);
            }
            if cs.builder.as_ref().unwrap().entries() == 0 {
                cs.current_output().smallest = key.clone();
            }
            cs.builder.as_mut().unwrap().add(&key, &val)?;
            // NOTE: Adjust max file size based on level.
            if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size {
                self.finish_compaction_output(cs, key.clone())?;
            }

            input.advance();
        }

        if cs.builder.is_some() {
            self.finish_compaction_output(cs, key)?;
        }

        let mut stats = CompactionStats::default();
        stats.micros = self.opt.env.micros() - start_ts;
        for parent in 0..2 {
            for inp in 0..cs.compaction.num_inputs(parent) {
                stats.read += cs.compaction.input(parent, inp).size;
            }
        }
        for output in &cs.outputs {
            stats.written += output.size;
        }
        self.cstats[cs.compaction.level()].add(stats);
        Ok(())
    }