fn set_operation()

in mdb_shard/src/set_operations.rs [103:380]


fn set_operation<R: Read + Seek, W: Write>(
    s: [&MDBShardInfo; 2],
    r: [&mut R; 2],
    out: &mut W,
    op: MDBSetOperation,
) -> Result<MDBShardInfo> {
    let mut out_offset = 0u64;

    let mut footer = MDBShardFileFooter::default();

    // Write out the header to the output.
    let header = MDBShardFileHeader::default();
    out_offset += header.serialize(out)? as u64;

    ///////////////////////////////////
    // File info section.
    // Set up the seek for the first section:
    r[0].seek(SeekFrom::Start(s[0].metadata.file_info_offset))?;
    r[1].seek(SeekFrom::Start(s[1].metadata.file_info_offset))?;

    footer.file_info_offset = out_offset;

    // This is written later, after this section.
    let mut file_lookup_data = Vec::<(u64, u32)>::new();
    {
        // Manually go through the whole file info section and

        let mut current_index = 0;

        let load_next = |_r: &mut R, _s: &MDBShardInfo| -> Result<_> {
            let fdsh = FileDataSequenceHeader::deserialize(_r)?;
            if fdsh.is_bookend() {
                Ok(None)
            } else {
                Ok(Some(fdsh))
            }
        };

        let mut file_data_header = [load_next(r[0], s[0])?, load_next(r[1], s[1])?];

        while let Some(action) =
            get_next_actions_for_file_info(file_data_header[0].as_ref(), file_data_header[1].as_ref(), op)
        {
            for i in [0, 1] {
                match action[i] {
                    NextAction::CopyToOut => {
                        let fh = file_data_header[i].as_ref().unwrap();

                        out_offset += fh.serialize(out)? as u64;

                        for _ in 0..fh.num_entries {
                            let entry = FileDataSequenceEntry::deserialize(r[i])?;
                            footer.materialized_bytes += entry.unpacked_segment_bytes as u64;
                            entry.serialize(out)?;
                        }

                        out_offset += (fh.num_entries as u64) * (size_of::<FileDataSequenceEntry>() as u64);

                        if fh.contains_verification() {
                            for _ in 0..fh.num_entries {
                                let entry = FileVerificationEntry::deserialize(r[i])?;
                                entry.serialize(out)?;
                            }

                            out_offset += (fh.num_entries as u64) * (size_of::<FileVerificationEntry>() as u64);
                        }

                        if fh.contains_metadata_ext() {
                            let entry = FileMetadataExt::deserialize(r[i])?;
                            out_offset += entry.serialize(out)? as u64;
                        }

                        file_lookup_data.push((truncate_hash(&fh.file_hash), current_index));

                        current_index += 1 + fh.num_info_entry_following();
                        file_data_header[i] = load_next(r[i], s[i])?;
                    },
                    NextAction::SkipOver => {
                        let fh = file_data_header[i].as_ref().unwrap();
                        r[i].seek(SeekFrom::Current(
                            (fh.num_info_entry_following() as i64) * (MDB_FILE_INFO_ENTRY_SIZE as i64),
                        ))?;
                        file_data_header[i] = load_next(r[i], s[i])?;
                    },
                    NextAction::Nothing => {},
                    NextAction::Merge => {
                        let fh0 = file_data_header[0].as_ref().unwrap();
                        let fh1 = file_data_header[1].as_ref().unwrap();
                        FileDataSequenceHeader::verify_same_file(fh0, fh1);
                        let has_verification = fh0.contains_verification() || fh1.contains_verification();
                        let has_metadata_ext = fh0.contains_metadata_ext() || fh1.contains_metadata_ext();

                        let header = FileDataSequenceHeader::new(
                            fh0.file_hash,
                            fh0.num_entries,
                            has_verification,
                            has_metadata_ext,
                        );
                        out_offset += header.serialize(out)? as u64;

                        // copy over the entries from fh0 and advance forward with fh1
                        for _ in 0..fh0.num_entries {
                            let entry = FileDataSequenceEntry::deserialize(r[0])?;
                            footer.materialized_bytes += entry.unpacked_segment_bytes as u64;
                            entry.serialize(out)?;
                        }

                        out_offset += (fh0.num_entries as u64) * (size_of::<FileDataSequenceEntry>() as u64);
                        r[1].seek(SeekFrom::Current((fh1.num_entries as i64) * (MDB_FILE_INFO_ENTRY_SIZE as i64)))?;

                        // if we have verification entries, copy them over from the appropriate shard and
                        // advance the other reader
                        if has_verification {
                            let (read_idx, advance_idx) = if fh0.contains_verification() { (0, 1) } else { (1, 0) };
                            let (read_header, advance_header) = if fh0.contains_verification() {
                                (fh0, fh1)
                            } else {
                                (fh1, fh0)
                            };
                            for _ in 0..read_header.num_entries {
                                let entry = FileVerificationEntry::deserialize(r[read_idx])?;
                                out_offset += entry.serialize(out)? as u64;
                            }
                            if advance_header.contains_verification() {
                                r[advance_idx].seek(SeekFrom::Current(
                                    (advance_header.num_entries as i64) * (MDB_FILE_INFO_ENTRY_SIZE as i64),
                                ))?;
                            }
                        }

                        // if we have metadata_ext, copy it over from the appropriate shard and advance the
                        // other reader
                        if has_metadata_ext {
                            let (read_idx, advance_idx) = if fh0.contains_metadata_ext() { (0, 1) } else { (1, 0) };
                            let (_read_header, advance_header) = if fh0.contains_metadata_ext() {
                                (fh0, fh1)
                            } else {
                                (fh1, fh0)
                            };
                            let entry = FileMetadataExt::deserialize(r[read_idx])?;
                            out_offset += entry.serialize(out)? as u64;
                            if advance_header.contains_metadata_ext() {
                                r[advance_idx].seek(SeekFrom::Current(MDB_FILE_INFO_ENTRY_SIZE as i64))?;
                            }
                        }

                        file_lookup_data.push((truncate_hash(&fh0.file_hash), current_index));
                        current_index += 1 + header.num_info_entry_following();

                        // load the next items
                        file_data_header[0] = load_next(r[0], s[0])?;
                        file_data_header[1] = load_next(r[1], s[1])?;
                    },
                };
            }
        }
        out_offset += FileDataSequenceHeader::bookend().serialize(out)? as u64;
    }

    // These are written later.
    let mut cas_lookup_data = Vec::<(u64, u32)>::new();
    let mut chunk_lookup_data = Vec::<(u64, (u32, u32))>::new();

    {
        ///////////////////////////////////
        // CAS info section.
        // Set up the seek for the first section:
        footer.cas_info_offset = out_offset;

        r[0].seek(SeekFrom::Start(s[0].metadata.cas_info_offset))?;
        r[1].seek(SeekFrom::Start(s[1].metadata.cas_info_offset))?;

        let mut current_index = 0;

        let load_next = |_r: &mut R, _s: &MDBShardInfo| -> Result<_> {
            let ccsh = CASChunkSequenceHeader::deserialize(_r)?;
            if ccsh.is_bookend() {
                Ok(None)
            } else {
                Ok(Some(ccsh))
            }
        };

        let mut cas_data_header = [load_next(r[0], s[0])?, load_next(r[1], s[1])?];

        while let Some(action) = get_next_actions(
            cas_data_header[0].as_ref().map(|h| &h.cas_hash),
            cas_data_header[1].as_ref().map(|h| &h.cas_hash),
            op,
        ) {
            for i in [0, 1] {
                match action[i] {
                    NextAction::CopyToOut => {
                        let fh = cas_data_header[i].as_ref().unwrap();
                        footer.stored_bytes_on_disk += fh.num_bytes_on_disk as u64;
                        footer.stored_bytes += fh.num_bytes_in_cas as u64;

                        out_offset += fh.serialize(out)? as u64;

                        for j in 0..fh.num_entries {
                            let chunk = CASChunkSequenceEntry::deserialize(r[i])?;

                            chunk_lookup_data.push((truncate_hash(&chunk.chunk_hash), (current_index, j)));
                            out_offset += chunk.serialize(out)? as u64;
                        }

                        cas_lookup_data.push((truncate_hash(&fh.cas_hash), current_index));

                        current_index += 1 + fh.num_entries;
                        cas_data_header[i] = load_next(r[i], s[i])?;
                    },
                    NextAction::SkipOver => {
                        let fh = cas_data_header[i].as_ref().unwrap();
                        r[i].seek(SeekFrom::Current(
                            (fh.num_entries as i64) * (size_of::<CASChunkSequenceEntry>() as i64),
                        ))?;
                        cas_data_header[i] = load_next(r[i], s[i])?;
                    },
                    NextAction::Nothing => {},
                    NextAction::Merge => {},
                };
            }
        }

        out_offset += CASChunkSequenceHeader::bookend().serialize(out)? as u64;
    }

    // The file lookup table
    {
        footer.file_lookup_offset = out_offset;
        footer.file_lookup_num_entry = file_lookup_data.len() as u64;
        out_offset += (file_lookup_data.len() * (size_of::<u64>() + size_of::<u32>())) as u64;
        for (h, idx) in file_lookup_data {
            write_u64(out, h)?;
            write_u32(out, idx)?;
        }
    }

    // The cas lookup table
    {
        // Write out the cas and chunk lookup sections.
        footer.cas_lookup_offset = out_offset;
        footer.cas_lookup_num_entry = cas_lookup_data.len() as u64;
        out_offset += (cas_lookup_data.len() * (size_of::<u64>() + size_of::<u32>())) as u64;

        for (h, idx) in cas_lookup_data {
            write_u64(out, h)?;
            write_u32(out, idx)?;
        }
    }

    // The chunk lookup table.
    {
        chunk_lookup_data.sort_unstable_by_key(|t| t.0);

        // Write out the cas and chunk lookup sections.
        footer.chunk_lookup_offset = out_offset;
        footer.chunk_lookup_num_entry = chunk_lookup_data.len() as u64;
        out_offset += (chunk_lookup_data.len() * (size_of::<u64>() + 2 * size_of::<u32>())) as u64;

        for (h, (i1, i2)) in chunk_lookup_data {
            write_u64(out, h)?;
            write_u32(out, i1)?;
            write_u32(out, i2)?;
        }
    }

    // Finally, rewrite the footer.
    {
        footer.footer_offset = out_offset;
        footer.serialize(out)?;
    }

    Ok(MDBShardInfo {
        header,
        metadata: footer,
    })
}