in mdb_shard/src/shard_format.rs [1022:1240]
fn export_as_keyed_shard_impl<R: Read + Seek, W: Write>(
reader: &mut R,
writer: &mut W,
hmac_key: HMACKey,
key_valid_for: Duration,
include_file_info: bool,
include_cas_lookup_table: bool,
include_chunk_lookup_table: bool,
// Pass this in when we have it so we can use debug asserts for verification checking in tests.
self_verification: Option<&Self>,
) -> Result<usize> {
// The footer at the end that will hold each of these sections.
let mut out_footer = MDBShardFileFooter::default();
// Read in the header, verifying all the information.
let in_header = MDBShardFileHeader::deserialize(reader)?;
// Dump out the header.
let mut byte_pos = 0;
byte_pos += in_header.serialize(writer)?;
// Read in all the file information.
out_footer.file_info_offset = byte_pos as u64;
// Possibly save the lookup info here.
let mut file_lookup = Vec::<(u64, u32)>::new();
// Index of entry for lookup table
let mut index: u32 = 0;
// materialized bytes for later storage
let mut materialized_bytes = 0;
loop {
let file_metadata = FileDataSequenceHeader::deserialize(reader)?;
if file_metadata.is_bookend() {
// Serialize the bookend struct and move on.
byte_pos += file_metadata.serialize(writer)?;
break;
}
let num_entries = file_metadata.num_entries as usize;
let mut n_extended_bytes = 0;
if file_metadata.contains_verification() {
n_extended_bytes += num_entries * size_of::<FileVerificationEntry>();
}
if file_metadata.contains_metadata_ext() {
n_extended_bytes += size_of::<FileMetadataExt>();
}
if include_file_info {
byte_pos += file_metadata.serialize(writer)?;
// Need to read in the metadata values so we can calculate the materialized bytes
for _ in 0..num_entries {
let entry = FileDataSequenceEntry::deserialize(reader)?;
materialized_bytes += entry.unpacked_segment_bytes as u64;
byte_pos += entry.serialize(writer)?;
}
// Okay to just copy the rest of values over as there is nothing different between the two shards
// up to this point.
if n_extended_bytes != 0 {
byte_pos += copy(&mut reader.take(n_extended_bytes as u64), writer)? as usize;
}
// Put in the lookup information
file_lookup.push((truncate_hash(&file_metadata.file_hash), index));
index += (1 + num_entries + n_extended_bytes / MDB_FILE_INFO_ENTRY_SIZE) as u32;
} else {
// Discard values until the next reader break.
copy(&mut reader.take(n_extended_bytes as u64), &mut std::io::sink())?;
}
}
if let Some(self_) = self_verification {
debug_assert_eq!(reader.stream_position()?, self_.metadata.cas_info_offset);
}
let mut cas_lookup = Vec::<(u64, u32)>::new();
let mut chunk_lookup = Vec::<(u64, (u32, u32))>::new();
// Now deal with all the cas information
out_footer.cas_info_offset = byte_pos as u64;
let mut cas_index = 0;
let mut stored_bytes_on_disk = 0;
let mut stored_bytes = 0;
loop {
let cas_metadata = CASChunkSequenceHeader::deserialize(reader)?;
// All metadata gets serialized.
byte_pos += cas_metadata.serialize(writer)?;
if cas_metadata.is_bookend() {
break;
}
if include_cas_lookup_table {
cas_lookup.push((truncate_hash(&cas_metadata.cas_hash), cas_index));
}
for chunk_index in 0..cas_metadata.num_entries {
let mut chunk = CASChunkSequenceEntry::deserialize(reader)?;
// MAke sure we don't actually put things into an unusable state.
if hmac_key != HMACKey::default() {
chunk.chunk_hash = chunk.chunk_hash.hmac(hmac_key);
}
if include_chunk_lookup_table {
chunk_lookup.push((truncate_hash(&chunk.chunk_hash), (cas_index, chunk_index)));
}
byte_pos += chunk.serialize(writer)?;
}
cas_index += 1 + cas_metadata.num_entries;
stored_bytes_on_disk += cas_metadata.num_bytes_on_disk as u64;
stored_bytes += cas_metadata.num_bytes_in_cas as u64;
}
if let Some(self_) = self_verification {
debug_assert_eq!(reader.stream_position()?, self_.metadata.file_lookup_offset);
}
// Copy over all the file lookup information if that's appropriate.
out_footer.file_lookup_offset = byte_pos as u64;
if include_file_info {
if let Some(self_) = self_verification {
debug_assert_eq!(file_lookup.len(), self_.metadata.file_lookup_num_entry as usize);
}
for &(key, idx) in file_lookup.iter() {
write_u64(writer, key)?;
write_u32(writer, idx)?;
}
byte_pos += file_lookup.len() * (size_of::<u64>() + size_of::<u32>());
out_footer.file_lookup_num_entry = file_lookup.len() as u64;
} else {
out_footer.file_lookup_num_entry = 0;
}
// CAS lookup section.
out_footer.cas_lookup_offset = byte_pos as u64;
if include_cas_lookup_table {
if let Some(self_) = self_verification {
debug_assert_eq!(cas_lookup.len(), self_.metadata.cas_lookup_num_entry as usize);
}
for &(key, idx) in cas_lookup.iter() {
write_u64(writer, key)?;
write_u32(writer, idx)?;
}
byte_pos += cas_lookup.len() * (size_of::<u64>() + size_of::<u32>());
out_footer.cas_lookup_num_entry = cas_lookup.len() as u64;
} else {
out_footer.cas_lookup_num_entry = 0;
}
out_footer.chunk_lookup_offset = byte_pos as u64;
// Chunk lookup section.
if include_chunk_lookup_table {
// This one is different now that it's hmac keyed, so we need to rebuild it.
chunk_lookup.sort_by_key(|s| s.0);
for &(h, (a, b)) in chunk_lookup.iter() {
write_u64(writer, h)?;
write_u32(writer, a)?;
write_u32(writer, b)?;
}
byte_pos += chunk_lookup.len() * (size_of::<u64>() + 2 * size_of::<u32>());
out_footer.chunk_lookup_num_entry = chunk_lookup.len() as u64;
} else {
out_footer.chunk_lookup_num_entry = 0;
}
out_footer.chunk_hash_hmac_key = hmac_key;
// Add in the timestamps.
let creation_time = std::time::SystemTime::now();
out_footer.shard_creation_timestamp = creation_time.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
out_footer.shard_key_expiry = creation_time
.add(key_valid_for)
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
// Copy over the stored information elsewhere
out_footer.materialized_bytes = materialized_bytes;
out_footer.stored_bytes_on_disk = stored_bytes_on_disk;
out_footer.stored_bytes = stored_bytes;
// And we're done here!
out_footer.footer_offset = byte_pos as u64;
// Write out the footer at the end.
byte_pos += out_footer.serialize(writer)?;
// Return the number of bytes written.
Ok(byte_pos)
}