in mdb_shard/src/set_operations.rs [103:380]
fn set_operation<R: Read + Seek, W: Write>(
s: [&MDBShardInfo; 2],
r: [&mut R; 2],
out: &mut W,
op: MDBSetOperation,
) -> Result<MDBShardInfo> {
let mut out_offset = 0u64;
let mut footer = MDBShardFileFooter::default();
// Write out the header to the output.
let header = MDBShardFileHeader::default();
out_offset += header.serialize(out)? as u64;
///////////////////////////////////
// File info section.
// Set up the seek for the first section:
r[0].seek(SeekFrom::Start(s[0].metadata.file_info_offset))?;
r[1].seek(SeekFrom::Start(s[1].metadata.file_info_offset))?;
footer.file_info_offset = out_offset;
// This is written later, after this section.
let mut file_lookup_data = Vec::<(u64, u32)>::new();
{
// Manually go through the whole file info section and
let mut current_index = 0;
let load_next = |_r: &mut R, _s: &MDBShardInfo| -> Result<_> {
let fdsh = FileDataSequenceHeader::deserialize(_r)?;
if fdsh.is_bookend() {
Ok(None)
} else {
Ok(Some(fdsh))
}
};
let mut file_data_header = [load_next(r[0], s[0])?, load_next(r[1], s[1])?];
while let Some(action) =
get_next_actions_for_file_info(file_data_header[0].as_ref(), file_data_header[1].as_ref(), op)
{
for i in [0, 1] {
match action[i] {
NextAction::CopyToOut => {
let fh = file_data_header[i].as_ref().unwrap();
out_offset += fh.serialize(out)? as u64;
for _ in 0..fh.num_entries {
let entry = FileDataSequenceEntry::deserialize(r[i])?;
footer.materialized_bytes += entry.unpacked_segment_bytes as u64;
entry.serialize(out)?;
}
out_offset += (fh.num_entries as u64) * (size_of::<FileDataSequenceEntry>() as u64);
if fh.contains_verification() {
for _ in 0..fh.num_entries {
let entry = FileVerificationEntry::deserialize(r[i])?;
entry.serialize(out)?;
}
out_offset += (fh.num_entries as u64) * (size_of::<FileVerificationEntry>() as u64);
}
if fh.contains_metadata_ext() {
let entry = FileMetadataExt::deserialize(r[i])?;
out_offset += entry.serialize(out)? as u64;
}
file_lookup_data.push((truncate_hash(&fh.file_hash), current_index));
current_index += 1 + fh.num_info_entry_following();
file_data_header[i] = load_next(r[i], s[i])?;
},
NextAction::SkipOver => {
let fh = file_data_header[i].as_ref().unwrap();
r[i].seek(SeekFrom::Current(
(fh.num_info_entry_following() as i64) * (MDB_FILE_INFO_ENTRY_SIZE as i64),
))?;
file_data_header[i] = load_next(r[i], s[i])?;
},
NextAction::Nothing => {},
NextAction::Merge => {
let fh0 = file_data_header[0].as_ref().unwrap();
let fh1 = file_data_header[1].as_ref().unwrap();
FileDataSequenceHeader::verify_same_file(fh0, fh1);
let has_verification = fh0.contains_verification() || fh1.contains_verification();
let has_metadata_ext = fh0.contains_metadata_ext() || fh1.contains_metadata_ext();
let header = FileDataSequenceHeader::new(
fh0.file_hash,
fh0.num_entries,
has_verification,
has_metadata_ext,
);
out_offset += header.serialize(out)? as u64;
// copy over the entries from fh0 and advance forward with fh1
for _ in 0..fh0.num_entries {
let entry = FileDataSequenceEntry::deserialize(r[0])?;
footer.materialized_bytes += entry.unpacked_segment_bytes as u64;
entry.serialize(out)?;
}
out_offset += (fh0.num_entries as u64) * (size_of::<FileDataSequenceEntry>() as u64);
r[1].seek(SeekFrom::Current((fh1.num_entries as i64) * (MDB_FILE_INFO_ENTRY_SIZE as i64)))?;
// if we have verification entries, copy them over from the appropriate shard and
// advance the other reader
if has_verification {
let (read_idx, advance_idx) = if fh0.contains_verification() { (0, 1) } else { (1, 0) };
let (read_header, advance_header) = if fh0.contains_verification() {
(fh0, fh1)
} else {
(fh1, fh0)
};
for _ in 0..read_header.num_entries {
let entry = FileVerificationEntry::deserialize(r[read_idx])?;
out_offset += entry.serialize(out)? as u64;
}
if advance_header.contains_verification() {
r[advance_idx].seek(SeekFrom::Current(
(advance_header.num_entries as i64) * (MDB_FILE_INFO_ENTRY_SIZE as i64),
))?;
}
}
// if we have metadata_ext, copy it over from the appropriate shard and advance the
// other reader
if has_metadata_ext {
let (read_idx, advance_idx) = if fh0.contains_metadata_ext() { (0, 1) } else { (1, 0) };
let (_read_header, advance_header) = if fh0.contains_metadata_ext() {
(fh0, fh1)
} else {
(fh1, fh0)
};
let entry = FileMetadataExt::deserialize(r[read_idx])?;
out_offset += entry.serialize(out)? as u64;
if advance_header.contains_metadata_ext() {
r[advance_idx].seek(SeekFrom::Current(MDB_FILE_INFO_ENTRY_SIZE as i64))?;
}
}
file_lookup_data.push((truncate_hash(&fh0.file_hash), current_index));
current_index += 1 + header.num_info_entry_following();
// load the next items
file_data_header[0] = load_next(r[0], s[0])?;
file_data_header[1] = load_next(r[1], s[1])?;
},
};
}
}
out_offset += FileDataSequenceHeader::bookend().serialize(out)? as u64;
}
// These are written later.
let mut cas_lookup_data = Vec::<(u64, u32)>::new();
let mut chunk_lookup_data = Vec::<(u64, (u32, u32))>::new();
{
///////////////////////////////////
// CAS info section.
// Set up the seek for the first section:
footer.cas_info_offset = out_offset;
r[0].seek(SeekFrom::Start(s[0].metadata.cas_info_offset))?;
r[1].seek(SeekFrom::Start(s[1].metadata.cas_info_offset))?;
let mut current_index = 0;
let load_next = |_r: &mut R, _s: &MDBShardInfo| -> Result<_> {
let ccsh = CASChunkSequenceHeader::deserialize(_r)?;
if ccsh.is_bookend() {
Ok(None)
} else {
Ok(Some(ccsh))
}
};
let mut cas_data_header = [load_next(r[0], s[0])?, load_next(r[1], s[1])?];
while let Some(action) = get_next_actions(
cas_data_header[0].as_ref().map(|h| &h.cas_hash),
cas_data_header[1].as_ref().map(|h| &h.cas_hash),
op,
) {
for i in [0, 1] {
match action[i] {
NextAction::CopyToOut => {
let fh = cas_data_header[i].as_ref().unwrap();
footer.stored_bytes_on_disk += fh.num_bytes_on_disk as u64;
footer.stored_bytes += fh.num_bytes_in_cas as u64;
out_offset += fh.serialize(out)? as u64;
for j in 0..fh.num_entries {
let chunk = CASChunkSequenceEntry::deserialize(r[i])?;
chunk_lookup_data.push((truncate_hash(&chunk.chunk_hash), (current_index, j)));
out_offset += chunk.serialize(out)? as u64;
}
cas_lookup_data.push((truncate_hash(&fh.cas_hash), current_index));
current_index += 1 + fh.num_entries;
cas_data_header[i] = load_next(r[i], s[i])?;
},
NextAction::SkipOver => {
let fh = cas_data_header[i].as_ref().unwrap();
r[i].seek(SeekFrom::Current(
(fh.num_entries as i64) * (size_of::<CASChunkSequenceEntry>() as i64),
))?;
cas_data_header[i] = load_next(r[i], s[i])?;
},
NextAction::Nothing => {},
NextAction::Merge => {},
};
}
}
out_offset += CASChunkSequenceHeader::bookend().serialize(out)? as u64;
}
// The file lookup table
{
footer.file_lookup_offset = out_offset;
footer.file_lookup_num_entry = file_lookup_data.len() as u64;
out_offset += (file_lookup_data.len() * (size_of::<u64>() + size_of::<u32>())) as u64;
for (h, idx) in file_lookup_data {
write_u64(out, h)?;
write_u32(out, idx)?;
}
}
// The cas lookup table
{
// Write out the cas and chunk lookup sections.
footer.cas_lookup_offset = out_offset;
footer.cas_lookup_num_entry = cas_lookup_data.len() as u64;
out_offset += (cas_lookup_data.len() * (size_of::<u64>() + size_of::<u32>())) as u64;
for (h, idx) in cas_lookup_data {
write_u64(out, h)?;
write_u32(out, idx)?;
}
}
// The chunk lookup table.
{
chunk_lookup_data.sort_unstable_by_key(|t| t.0);
// Write out the cas and chunk lookup sections.
footer.chunk_lookup_offset = out_offset;
footer.chunk_lookup_num_entry = chunk_lookup_data.len() as u64;
out_offset += (chunk_lookup_data.len() * (size_of::<u64>() + 2 * size_of::<u32>())) as u64;
for (h, (i1, i2)) in chunk_lookup_data {
write_u64(out, h)?;
write_u32(out, i1)?;
write_u32(out, i2)?;
}
}
// Finally, rewrite the footer.
{
footer.footer_offset = out_offset;
footer.serialize(out)?;
}
Ok(MDBShardInfo {
header,
metadata: footer,
})
}