mdb_shard/src/shard_format.rs (1,275 lines of code) (raw):

use std::collections::{BTreeMap, HashMap}; use std::io::{copy, Read, Seek, SeekFrom, Write}; use std::mem::size_of; use std::ops::Add; use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; use anyhow::anyhow; use futures::AsyncReadExt; use merklehash::{HMACKey, MerkleHash}; use static_assertions::const_assert; use tracing::debug; use utils::serialization_utils::*; use crate::cas_structs::*; use crate::constants::*; use crate::error::{MDBShardError, Result}; use crate::file_structs::*; use crate::interpolation_search::search_on_sorted_u64s; use crate::shard_in_memory::MDBInMemoryShard; use crate::utils::{shard_expiry_time, truncate_hash}; // Same size for FileDataSequenceHeader and FileDataSequenceEntry pub const MDB_FILE_INFO_ENTRY_SIZE: usize = size_of::<[u64; 4]>() + 4 * size_of::<u32>(); const_assert!(MDB_FILE_INFO_ENTRY_SIZE == size_of::<FileDataSequenceHeader>()); const_assert!(MDB_FILE_INFO_ENTRY_SIZE == size_of::<FileDataSequenceEntry>()); const_assert!(MDB_FILE_INFO_ENTRY_SIZE == size_of::<FileVerificationEntry>()); const_assert!(MDB_FILE_INFO_ENTRY_SIZE == size_of::<FileMetadataExt>()); // Same size for CASChunkSequenceHeader and CASChunkSequenceEntry const MDB_CAS_INFO_ENTRY_SIZE: usize = size_of::<[u64; 4]>() + 4 * size_of::<u32>(); const_assert!(MDB_CAS_INFO_ENTRY_SIZE == size_of::<CASChunkSequenceHeader>()); const_assert!(MDB_CAS_INFO_ENTRY_SIZE == size_of::<CASChunkSequenceEntry>()); const MDB_SHARD_FOOTER_SIZE: i64 = size_of::<MDBShardFileFooter>() as i64; const MDB_SHARD_HEADER_VERSION: u64 = 2; const MDB_SHARD_FOOTER_VERSION: u64 = 1; // At the start of each shard file, insert a tag plus a magic-number sequence of bytes to ensure // that we are able to quickly identify a file as a shard file. // FOR NOW: Change the header tag to include BETA. When we're ready to const MDB_SHARD_HEADER_TAG: [u8; 32] = [ b'H', b'F', b'R', b'e', b'p', b'o', b'M', b'e', b't', b'a', b'D', b'a', b't', b'a', 0, 85, 105, 103, 69, 106, 123, 129, 87, 131, 165, 189, 217, 92, 205, 209, 74, 169, ]; #[inline] pub fn current_timestamp() -> u64 { // Get the seconds since the epoc as u64 std::time::SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs() } #[derive(Clone, Debug, PartialEq)] pub struct MDBShardFileHeader { // Header to be determined? "XetHub MDB Shard File Version 1" pub tag: [u8; 32], pub version: u64, pub footer_size: u64, } impl Default for MDBShardFileHeader { fn default() -> Self { Self { tag: MDB_SHARD_HEADER_TAG, version: MDB_SHARD_HEADER_VERSION, footer_size: MDB_SHARD_FOOTER_SIZE as u64, } } } impl MDBShardFileHeader { pub fn serialize<W: Write>(&self, writer: &mut W) -> Result<usize> { writer.write_all(&MDB_SHARD_HEADER_TAG)?; write_u64(writer, self.version)?; write_u64(writer, self.footer_size)?; Ok(size_of::<MDBShardFileHeader>()) } pub fn deserialize<R: Read>(reader: &mut R) -> Result<Self> { let mut tag = [0u8; 32]; reader.read_exact(&mut tag)?; if tag != MDB_SHARD_HEADER_TAG { return Err(MDBShardError::ShardVersionError( "File does not appear to be a valid Merkle DB Shard file (Wrong Magic Number).".to_owned(), )); } Ok(Self { tag, version: read_u64(reader)?, footer_size: read_u64(reader)?, }) } } #[derive(Clone, Debug, PartialEq)] pub struct MDBShardFileFooter { pub version: u64, pub file_info_offset: u64, pub cas_info_offset: u64, // Lookup tables. These come after the info sections to allow the shard to be partially // read without needing to read the footer. pub file_lookup_offset: u64, pub file_lookup_num_entry: u64, pub cas_lookup_offset: u64, pub cas_lookup_num_entry: u64, pub chunk_lookup_offset: u64, pub chunk_lookup_num_entry: u64, // HMAC key protection for the chunk hashes. If zero, then no key. pub chunk_hash_hmac_key: HMACKey, // The creation time of this shard, in seconds since the epoc pub shard_creation_timestamp: u64, // The time, in seconds since the epoc, after which this shard is no longer assumed to be valid. // Locally created shards do not have an expiry. pub shard_key_expiry: u64, // More locations to stick in here if needed. pub _buffer: [u64; 6], pub stored_bytes_on_disk: u64, pub materialized_bytes: u64, pub stored_bytes: u64, pub footer_offset: u64, // Always last. } impl Default for MDBShardFileFooter { fn default() -> Self { Self { version: MDB_SHARD_FOOTER_VERSION, file_info_offset: 0, cas_info_offset: 0, file_lookup_offset: 0, file_lookup_num_entry: 0, cas_lookup_offset: 0, cas_lookup_num_entry: 0, chunk_lookup_offset: 0, chunk_lookup_num_entry: 0, chunk_hash_hmac_key: HMACKey::default(), // No HMAC key // On serialization, this is set to current time if this is zero. shard_creation_timestamp: 0, shard_key_expiry: u64::MAX, _buffer: [0u64; 6], stored_bytes_on_disk: 0, materialized_bytes: 0, stored_bytes: 0, footer_offset: 0, } } } impl MDBShardFileFooter { pub fn serialize<W: Write>(&self, writer: &mut W) -> Result<usize> { write_u64(writer, self.version)?; write_u64(writer, self.file_info_offset)?; write_u64(writer, self.cas_info_offset)?; write_u64(writer, self.file_lookup_offset)?; write_u64(writer, self.file_lookup_num_entry)?; write_u64(writer, self.cas_lookup_offset)?; write_u64(writer, self.cas_lookup_num_entry)?; write_u64(writer, self.chunk_lookup_offset)?; write_u64(writer, self.chunk_lookup_num_entry)?; write_hash(writer, &self.chunk_hash_hmac_key)?; write_u64(writer, self.shard_creation_timestamp)?; write_u64(writer, self.shard_key_expiry)?; write_u64s(writer, &self._buffer)?; write_u64(writer, self.stored_bytes_on_disk)?; write_u64(writer, self.materialized_bytes)?; write_u64(writer, self.stored_bytes)?; write_u64(writer, self.footer_offset)?; Ok(size_of::<MDBShardFileFooter>()) } pub fn deserialize<R: Read>(reader: &mut R) -> Result<Self> { let version = read_u64(reader)?; // Do a version check as a simple guard against using this in an old repository if version != MDB_SHARD_FOOTER_VERSION { return Err(MDBShardError::ShardVersionError(format!( "Error: Expected footer version {MDB_SHARD_FOOTER_VERSION}, got {version}" ))); } let mut obj = Self { version, file_info_offset: read_u64(reader)?, cas_info_offset: read_u64(reader)?, file_lookup_offset: read_u64(reader)?, file_lookup_num_entry: read_u64(reader)?, cas_lookup_offset: read_u64(reader)?, cas_lookup_num_entry: read_u64(reader)?, chunk_lookup_offset: read_u64(reader)?, chunk_lookup_num_entry: read_u64(reader)?, chunk_hash_hmac_key: read_hash(reader)?, shard_creation_timestamp: read_u64(reader)?, shard_key_expiry: read_u64(reader)?, ..Default::default() }; read_u64s(reader, &mut obj._buffer)?; obj.stored_bytes_on_disk = read_u64(reader)?; obj.materialized_bytes = read_u64(reader)?; obj.stored_bytes = read_u64(reader)?; obj.footer_offset = read_u64(reader)?; Ok(obj) } pub async fn deserialize_async<R: futures::io::AsyncRead + Unpin>(reader: &mut R) -> Result<Self> { let mut v = [0u8; size_of::<Self>()]; reader.read_exact(&mut v[..]).await?; let mut reader_curs = std::io::Cursor::new(&v); Self::deserialize(&mut reader_curs) } } /// File info. This is a list of the file hash content to be downloaded. /// /// Each file consists of a FileDataSequenceHeader following /// a sequence of FileDataSequenceEntry. /// /// [ /// FileDataSequenceHeader, // u32 index in File lookup directs here. /// [ /// FileDataSequenceEntry, /// ], /// ], // Repeats per file. /// /// /// ---------------------------------------------------------------------------- /// /// CAS info. This is a list of chunks in order of appearance in the CAS chunks. /// /// Each CAS consists of a CASChunkSequenceHeader following /// a sequence of CASChunkSequenceEntry. /// /// [ /// CASChunkSequenceHeader, // u32 index in CAS lookup directs here. /// [ /// CASChunkSequenceEntry, // (u32, u32) index in Chunk lookup directs here. /// ], /// ], // Repeats per CAS. /// /// ---------------------------------------------------------------------------- /// /// File info lookup. This is a lookup of a truncated file hash to the /// location index in the File info section. /// /// Sorted Vec<(u64, u32)> on the u64. /// /// The first entry is the u64 truncated file hash, and the next entry is the /// index in the file info section of the element that starts the file reconstruction section. /// /// ---------------------------------------------------------------------------- /// /// CAS info lookup. This is a lookup of a truncated CAS hash to the /// location index in the CAS info section. /// /// Sorted Vec<(u64, u32)> on the u64. /// /// The first entry is the u64 truncated CAS block hash, and the next entry is the /// index in the cas info section of the element that starts the cas entry section. /// /// ---------------------------------------------------------------------------- /// /// Chunk info lookup. This is a lookup of a truncated CAS chunk hash to the /// location in the CAS info section. /// /// Sorted Vec<(u64, (u32, u32))> on the u64. /// /// The first entry is the u64 truncated CAS chunk in a CAS block, the first u32 is the index /// in the CAS info section that is the start of the CAS block, and the subsequent u32 gives /// the offset index of the chunk in that CAS block. #[derive(Clone, Default, Debug, PartialEq)] pub struct MDBShardInfo { pub header: MDBShardFileHeader, pub metadata: MDBShardFileFooter, } impl MDBShardInfo { pub fn load_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Self> { let mut obj = Self::default(); // Move cursor to beginning of shard file. reader.rewind()?; obj.header = MDBShardFileHeader::deserialize(reader)?; // Move cursor to end of shard file minus footer size. reader.seek(SeekFrom::End(-MDB_SHARD_FOOTER_SIZE))?; obj.metadata = MDBShardFileFooter::deserialize(reader)?; Ok(obj) } pub fn serialize_from<W: Write>(writer: &mut W, mdb: &MDBInMemoryShard, expiry: Option<Duration>) -> Result<Self> { let mut shard = MDBShardInfo::default(); let mut bytes_pos: usize = 0; // Write shard header. bytes_pos += shard.header.serialize(writer)?; // Write file info. shard.metadata.file_info_offset = bytes_pos as u64; let ((file_lookup_keys, file_lookup_vals), bytes_written) = Self::convert_and_save_file_info(writer, &mdb.file_content)?; bytes_pos += bytes_written; // Write CAS info. shard.metadata.cas_info_offset = bytes_pos as u64; let ((cas_lookup_keys, cas_lookup_vals), (chunk_lookup_keys, chunk_lookup_vals), bytes_written) = Self::convert_and_save_cas_info(writer, &mdb.cas_content)?; bytes_pos += bytes_written; // Write file info lookup table. shard.metadata.file_lookup_offset = bytes_pos as u64; shard.metadata.file_lookup_num_entry = file_lookup_keys.len() as u64; for (&e1, &e2) in file_lookup_keys.iter().zip(file_lookup_vals.iter()) { write_u64(writer, e1)?; write_u32(writer, e2)?; } bytes_pos += size_of::<u64>() * file_lookup_keys.len() + size_of::<u32>() * file_lookup_vals.len(); // Release memory. drop(file_lookup_keys); drop(file_lookup_vals); // Write cas info lookup table. shard.metadata.cas_lookup_offset = bytes_pos as u64; shard.metadata.cas_lookup_num_entry = cas_lookup_keys.len() as u64; for (&e1, &e2) in cas_lookup_keys.iter().zip(cas_lookup_vals.iter()) { write_u64(writer, e1)?; write_u32(writer, e2)?; } bytes_pos += size_of::<u64>() * cas_lookup_keys.len() + size_of::<u32>() * cas_lookup_vals.len(); // Write chunk lookup table. shard.metadata.chunk_lookup_offset = bytes_pos as u64; shard.metadata.chunk_lookup_num_entry = chunk_lookup_keys.len() as u64; for (&e1, &e2) in chunk_lookup_keys.iter().zip(chunk_lookup_vals.iter()) { write_u64(writer, e1)?; write_u32(writer, e2.0)?; write_u32(writer, e2.1)?; } bytes_pos += size_of::<u64>() * chunk_lookup_keys.len() + size_of::<u64>() * chunk_lookup_vals.len(); // Update repo size information. shard.metadata.stored_bytes_on_disk = mdb.stored_bytes_on_disk(); shard.metadata.materialized_bytes = mdb.materialized_bytes(); shard.metadata.stored_bytes = mdb.stored_bytes(); // Update footer offset. shard.metadata.footer_offset = bytes_pos as u64; if let Some(shard_valid_for) = expiry { // Use this to cause things to not be valid after a certain while. shard.metadata.shard_key_expiry = shard_expiry_time(shard_valid_for); } // Write shard footer. shard.metadata.serialize(writer)?; Ok(shard) } #[allow(clippy::type_complexity)] fn convert_and_save_file_info<W: Write>( writer: &mut W, file_content: &BTreeMap<MerkleHash, MDBFileInfo>, ) -> Result<( (Vec<u64>, Vec<u32>), // File Lookup Info usize, // Bytes used for File Content Info )> { // File info lookup table. let mut file_lookup_keys = Vec::<u64>::with_capacity(file_content.len()); let mut file_lookup_vals = Vec::<u32>::with_capacity(file_content.len()); let mut index: u32 = 0; let mut bytes_written = 0; for (file_hash, content) in file_content { file_lookup_keys.push(truncate_hash(file_hash)); file_lookup_vals.push(index); let bytes = content.serialize(writer)?; bytes_written += bytes; debug_assert!(bytes % MDB_FILE_INFO_ENTRY_SIZE == 0); index += (bytes / MDB_FILE_INFO_ENTRY_SIZE) as u32; } // Serialize a single bookend entry as a guard for sequential reading. bytes_written += FileDataSequenceHeader::bookend().serialize(writer)?; // No need to sort because BTreeMap is ordered and we truncate by the first 8 bytes. Ok(((file_lookup_keys, file_lookup_vals), bytes_written)) } #[allow(clippy::type_complexity)] fn convert_and_save_cas_info<W: Write>( writer: &mut W, cas_content: &BTreeMap<MerkleHash, Arc<MDBCASInfo>>, ) -> Result<( (Vec<u64>, Vec<u32>), // CAS Lookup Info (Vec<u64>, Vec<(u32, u32)>), // Chunk Lookup Info usize, // Bytes used for CAS Content Info )> { // CAS info lookup table. let mut cas_lookup_keys = Vec::<u64>::with_capacity(cas_content.len()); let mut cas_lookup_vals = Vec::<u32>::with_capacity(cas_content.len()); // Chunk lookup table. let mut chunk_lookup_keys = Vec::<u64>::with_capacity(cas_content.len()); // may grow let mut chunk_lookup_vals = Vec::<(u32, u32)>::with_capacity(cas_content.len()); // may grow let mut index: u32 = 0; let mut bytes_written = 0; for (cas_hash, content) in cas_content { cas_lookup_keys.push(truncate_hash(cas_hash)); cas_lookup_vals.push(index); bytes_written += content.metadata.serialize(writer)?; for (i, chunk) in content.chunks.iter().enumerate() { bytes_written += chunk.serialize(writer)?; chunk_lookup_keys.push(truncate_hash(&chunk.chunk_hash)); chunk_lookup_vals.push((index, i as u32)); } index += 1 + content.chunks.len() as u32; } // Serialize a single bookend entry as a guard for sequential reading. bytes_written += CASChunkSequenceHeader::bookend().serialize(writer)?; // No need to sort cas_lookup_ because BTreeMap is ordered and we truncate by the first 8 bytes. // Sort chunk lookup table by key. let mut chunk_lookup_combined = chunk_lookup_keys.iter().zip(chunk_lookup_vals.iter()).collect::<Vec<_>>(); chunk_lookup_combined.sort_unstable_by_key(|&(k, _)| k); Ok(( (cas_lookup_keys, cas_lookup_vals), ( chunk_lookup_combined.iter().map(|&(k, _)| *k).collect(), chunk_lookup_combined.iter().map(|&(_, v)| *v).collect(), ), bytes_written, )) } pub fn chunk_hashes_protected(&self) -> bool { self.metadata.chunk_hash_hmac_key != HMACKey::default() } pub fn chunk_hmac_key(&self) -> Option<HMACKey> { if self.metadata.chunk_hash_hmac_key == HMACKey::default() { None } else { Some(self.metadata.chunk_hash_hmac_key) } } pub fn get_file_info_index_by_hash<R: Read + Seek>( &self, reader: &mut R, file_hash: &MerkleHash, dest_indices: &mut [u32; 8], ) -> Result<usize> { let num_indices = search_on_sorted_u64s( reader, self.metadata.file_lookup_offset, self.metadata.file_lookup_num_entry, truncate_hash(file_hash), read_u32::<R>, dest_indices, )?; // Assume no more than 8 collisions. if num_indices < dest_indices.len() { Ok(num_indices) } else { Err(MDBShardError::TruncatedHashCollisionError(truncate_hash(file_hash))) } } pub fn get_cas_info_index_by_hash<R: Read + Seek>( &self, reader: &mut R, cas_hash: &MerkleHash, dest_indices: &mut [u32; 8], ) -> Result<usize> { let num_indices = search_on_sorted_u64s( reader, self.metadata.cas_lookup_offset, self.metadata.cas_lookup_num_entry, truncate_hash(cas_hash), read_u32::<R>, dest_indices, )?; // Assume no more than 8 collisions. if num_indices < dest_indices.len() { Ok(num_indices) } else { Err(MDBShardError::TruncatedHashCollisionError(truncate_hash(cas_hash))) } } pub fn get_cas_info_index_by_chunk<R: Read + Seek>( &self, reader: &mut R, unkeyed_chunk_hash: &MerkleHash, dest_indices: &mut [(u32, u32); 8], ) -> Result<usize> { let num_indices = search_on_sorted_u64s( reader, self.metadata.chunk_lookup_offset, self.metadata.chunk_lookup_num_entry, truncate_hash(&self.keyed_chunk_hash(unkeyed_chunk_hash)), |reader| Ok((read_u32(reader)?, read_u32(reader)?)), dest_indices, )?; // Chunk lookup hashes are Ok to have (many) collisions, // we will use a subset of collisions to do dedup. if num_indices == dest_indices.len() { debug!( "Found {:?} or more collisions when searching for truncated hash {:?}", dest_indices.len(), truncate_hash(unkeyed_chunk_hash) ); } Ok(num_indices) } /// Reads the file info from a specific index. Note that this is the position pub fn read_file_info<R: Read + Seek>(&self, reader: &mut R, file_entry_index: u32) -> Result<MDBFileInfo> { reader.seek(SeekFrom::Start( self.metadata.file_info_offset + (MDB_FILE_INFO_ENTRY_SIZE as u64) * (file_entry_index as u64), ))?; let Some(mdb_file) = MDBFileInfo::deserialize(reader)? else { return Err(MDBShardError::InternalError(anyhow!("invalid file entry index"))); }; Ok(mdb_file) } pub fn read_all_file_info_sections<R: Read + Seek>(&self, reader: &mut R) -> Result<Vec<MDBFileInfo>> { let mut ret = Vec::<MDBFileInfo>::with_capacity(self.num_file_entries()); reader.seek(SeekFrom::Start(self.metadata.file_info_offset))?; while let Some(mdb_file) = MDBFileInfo::deserialize(reader)? { ret.push(mdb_file); } Ok(ret) } pub fn read_all_cas_blocks<R: Read + Seek>(&self, reader: &mut R) -> Result<Vec<(CASChunkSequenceHeader, u64)>> { // Reads all the cas blocks, returning a list of the cas info and the // starting position of that cas block. let mut cas_blocks = Vec::<(CASChunkSequenceHeader, u64)>::with_capacity(self.metadata.cas_lookup_num_entry as usize); reader.seek(SeekFrom::Start(self.metadata.cas_info_offset))?; loop { let pos = reader.stream_position()?; let cas_block = CASChunkSequenceHeader::deserialize(reader)?; if cas_block.is_bookend() { break; } let n = cas_block.num_entries; cas_blocks.push((cas_block, pos)); reader.seek(SeekFrom::Current((size_of::<CASChunkSequenceEntry>() as i64) * (n as i64)))?; } Ok(cas_blocks) } /// Returns the keyed chunk hash for the shard. #[inline] pub fn keyed_chunk_hash(&self, chunk_hash: impl AsRef<MerkleHash>) -> MerkleHash { let chunk_hash = *chunk_hash.as_ref(); if self.metadata.chunk_hash_hmac_key != HMACKey::default() { chunk_hash.hmac(self.metadata.chunk_hash_hmac_key) } else { chunk_hash } } /// Returns a vector holding all the chunk hashes along with their (cas idx, entry idx) locations pub fn read_all_cas_blocks_full<R: Read + Seek>(&self, reader: &mut R) -> Result<Vec<MDBCASInfo>> { let mut ret = Vec::with_capacity(self.num_cas_entries()); let (cas_info_start, _cas_info_end) = self.cas_info_byte_range(); reader.seek(SeekFrom::Start(cas_info_start))?; while let Some(cas_info) = MDBCASInfo::deserialize(reader)? { debug_assert!(reader.stream_position()? < _cas_info_end); ret.push(cas_info); } debug_assert_eq!(reader.stream_position()?, _cas_info_end); Ok(ret) } pub fn read_full_cas_lookup<R: Read + Seek>(&self, reader: &mut R) -> Result<Vec<(u64, u32)>> { // Reads all the cas blocks, returning a list of the cas info and the // starting position of that cas block. let mut cas_lookup: Vec<(u64, u32)> = Vec::with_capacity(self.metadata.cas_lookup_num_entry as usize); reader.seek(SeekFrom::Start(self.metadata.cas_lookup_offset))?; for _ in 0..self.metadata.cas_lookup_num_entry { let trunc_cas_hash: u64 = read_u64(reader)?; let idx: u32 = read_u32(reader)?; cas_lookup.push((trunc_cas_hash, idx)); } Ok(cas_lookup) } // Given a file pointer, returns the information needed to reconstruct the file. // The information is stored in the destination vector dest_results. The function // returns true if the file hash was found, and false otherwise. pub fn get_file_reconstruction_info<R: Read + Seek>( &self, reader: &mut R, file_hash: &MerkleHash, ) -> Result<Option<MDBFileInfo>> { // Search in file info lookup table. let mut dest_indices = [0u32; 8]; let num_indices = self.get_file_info_index_by_hash(reader, file_hash, &mut dest_indices)?; // Check each file info if the file hash matches. for &file_entry_index in dest_indices.iter().take(num_indices) { let mdb_file_info = self.read_file_info(reader, file_entry_index)?; if mdb_file_info.metadata.file_hash == *file_hash { return Ok(Some(mdb_file_info)); } } Ok(None) } // Performs a query of block hashes against a known block hash, matching // as many of the values in query_hashes as possible. It returns the number // of entries matched from the input hashes, the CAS block hash of the match, // and the range matched from that block. // In this case, a location hint is given to the function. It will only return a // match from that point pub fn chunk_hash_dedup_query_direct<R: Read + Seek>( &self, reader: &mut R, unkeyed_query_hashes: &[MerkleHash], cas_entry_index: u32, cas_chunk_offset: u32, ) -> Result<Option<(usize, FileDataSequenceEntry)>> { if unkeyed_query_hashes.is_empty() { return Ok(None); } reader.seek(SeekFrom::Start( self.metadata.cas_info_offset + (MDB_CAS_INFO_ENTRY_SIZE as u64) * (cas_entry_index as u64), ))?; let cas_header = CASChunkSequenceHeader::deserialize(reader)?; if cas_chunk_offset != 0 { // Jump forward to the chunk at chunk_offset. reader.seek(SeekFrom::Current(MDB_CAS_INFO_ENTRY_SIZE as i64 * cas_chunk_offset as i64))?; } // Now, read in data while the query hashes match. let first_chunk = CASChunkSequenceEntry::deserialize(reader)?; if first_chunk.chunk_hash != self.keyed_chunk_hash(unkeyed_query_hashes[0]) { return Ok(None); } let mut n_bytes = first_chunk.unpacked_segment_bytes; // Read everything else until the CAS block end. let mut end_idx = 0; for i in 1.. { if cas_chunk_offset as usize + i == cas_header.num_entries as usize { end_idx = i; break; } let chunk = CASChunkSequenceEntry::deserialize(reader)?; if i == unkeyed_query_hashes.len() || chunk.chunk_hash != self.keyed_chunk_hash(unkeyed_query_hashes[i]) { end_idx = i; break; } n_bytes += chunk.unpacked_segment_bytes; } Ok(Some(( end_idx, FileDataSequenceEntry { cas_hash: cas_header.cas_hash, cas_flags: cas_header.cas_flags, unpacked_segment_bytes: n_bytes, chunk_index_start: cas_chunk_offset, chunk_index_end: cas_chunk_offset + end_idx as u32, }, ))) } // Performs a query of block hashes against a known block hash, matching // as many of the values in query_hashes as possible. It returns the number // of entries matched from the input hashes, the CAS block hash of the match, // and the range matched from that block. pub fn chunk_hash_dedup_query<R: Read + Seek>( &self, reader: &mut R, query_hashes: &[MerkleHash], ) -> Result<Option<(usize, FileDataSequenceEntry)>> { if query_hashes.is_empty() || self.metadata.chunk_lookup_num_entry == 0 { return Ok(None); } // Lookup CAS block from chunk lookup. let mut dest_indices = [(0u32, 0u32); 8]; let num_indices = self.get_cas_info_index_by_chunk(reader, &query_hashes[0], &mut dest_indices)?; // Sequentially match chunks in that block. for &(cas_index, chunk_offset) in dest_indices.iter().take(num_indices) { if let Some(cas) = self.chunk_hash_dedup_query_direct(reader, query_hashes, cas_index, chunk_offset)? { return Ok(Some(cas)); } } Ok(None) } pub fn read_all_truncated_hashes<R: Read + Seek>(&self, reader: &mut R) -> Result<Vec<(u64, (u32, u32))>> { let mut ret; if self.metadata.chunk_lookup_num_entry != 0 { ret = Vec::with_capacity(self.metadata.chunk_lookup_num_entry as usize); reader.seek(SeekFrom::Start(self.metadata.chunk_lookup_offset))?; for _ in 0..self.metadata.chunk_lookup_num_entry { ret.push((read_u64(reader)?, (read_u32(reader)?, read_u32(reader)?))); } } else { let (cas_info_start, cas_info_end) = self.cas_info_byte_range(); // We don't have the lookup table, so let n_elements_cap = (cas_info_end - cas_info_start) as usize / size_of::<CASChunkSequenceEntry>(); ret = Vec::with_capacity(n_elements_cap); let mut cas_index = 0; reader.seek(SeekFrom::Start(cas_info_start))?; while reader.stream_position()? < cas_info_end { let cas_header = CASChunkSequenceHeader::deserialize(reader)?; for chunk_index in 0..cas_header.num_entries { let chunk = CASChunkSequenceEntry::deserialize(reader)?; ret.push((truncate_hash(&chunk.chunk_hash), (cas_index, chunk_index))); } cas_index += 1 + cas_header.num_entries; } } Ok(ret) } pub fn num_cas_entries(&self) -> usize { self.metadata.cas_lookup_num_entry as usize } pub fn num_file_entries(&self) -> usize { self.metadata.file_lookup_num_entry as usize } pub fn total_num_chunks(&self) -> usize { self.metadata.chunk_lookup_num_entry as usize } pub fn file_info_byte_range(&self) -> (u64, u64) { (self.metadata.file_info_offset, self.metadata.cas_info_offset) } pub fn cas_info_byte_range(&self) -> (u64, u64) { (self.metadata.cas_info_offset, self.metadata.file_lookup_offset) } pub fn file_lookup_byte_range(&self) -> (u64, u64) { (self.metadata.file_lookup_offset, self.metadata.cas_lookup_offset) } pub fn cas_lookup_byte_range(&self) -> (u64, u64) { (self.metadata.cas_lookup_offset, self.metadata.chunk_lookup_offset) } pub fn chuck_lookup_byte_range(&self) -> (u64, u64) { (self.metadata.chunk_lookup_offset, self.metadata.footer_offset) } /// Returns the number of bytes in the shard pub fn num_bytes(&self) -> u64 { self.metadata.footer_offset + size_of::<MDBShardFileFooter>() as u64 } pub fn stored_bytes_on_disk(&self) -> u64 { self.metadata.stored_bytes_on_disk } pub fn materialized_bytes(&self) -> u64 { self.metadata.materialized_bytes } pub fn stored_bytes(&self) -> u64 { self.metadata.stored_bytes } /// returns the number of bytes that is fixed and not part of any content; i.e. would be part of an empty shard. pub fn non_content_byte_size() -> u64 { (size_of::<MDBShardFileFooter>() + size_of::<MDBShardFileHeader>()) as u64 // Header and footer + size_of::<FileDataSequenceHeader>() as u64 // Guard block for scanning. + size_of::<CASChunkSequenceHeader>() as u64 // Guard block for scanning. } pub fn print_report(&self) { // File info bytes. let (file_info_start, file_info_end) = self.file_info_byte_range(); eprintln!("Byte size of file info: {}, ({file_info_start} - {file_info_end})", file_info_end - file_info_start); // Cas info bytes. let (cas_info_start, cas_info_end) = self.cas_info_byte_range(); eprintln!("Byte size of cas info: {}, ({cas_info_start} - {cas_info_end})", cas_info_end - cas_info_start); // File lookup bytes. let (file_lookup_start, file_lookup_end) = self.file_lookup_byte_range(); eprintln!( "Byte size of file lookup: {}, ({file_lookup_start} - {file_lookup_end})", file_lookup_end - file_lookup_start ); // CAS lookup bytes. let (cas_lookup_start, cas_lookup_end) = self.cas_lookup_byte_range(); eprintln!( "Byte size of cas lookup: {}, ({cas_lookup_start} - {cas_lookup_end})", cas_lookup_end - cas_lookup_start ); // Chunk lookup bytes. let (chunk_lookup_start, chunk_lookup_end) = self.chuck_lookup_byte_range(); eprintln!( "Byte size of chunk lookup: {}, ({chunk_lookup_start} - {chunk_lookup_end})", chunk_lookup_end - chunk_lookup_start ); } /// Read all file info from shard and return a collection of /// (file_hash, (byte_start, byte_end) for file_data_sequence_entry, /// Option<(byte_start, byte_end)> for file_verification_entry, /// and an Option<MerkleHash> for the SHA if it is present. #[allow(clippy::type_complexity)] pub fn read_file_info_ranges<R: Read + Seek>( reader: &mut R, ) -> Result<Vec<(MerkleHash, (u64, u64), Option<(u64, u64)>, Option<MerkleHash>)>> { let mut ret = Vec::new(); let _shard_header = MDBShardFileHeader::deserialize(reader)?; loop { let header = FileDataSequenceHeader::deserialize(reader)?; if header.is_bookend() { break; } let byte_start = reader.stream_position()?; reader.seek(SeekFrom::Current(header.num_entries as i64 * size_of::<FileDataSequenceEntry>() as i64))?; let byte_end = reader.stream_position()?; let data_sequence_entry_byte_range = (byte_start, byte_end); let verification_entry_byte_range = if header.contains_verification() { let byte_start = byte_end; reader .seek(SeekFrom::Current(header.num_entries as i64 * size_of::<FileVerificationEntry>() as i64))?; let byte_end = reader.stream_position()?; Some((byte_start, byte_end)) } else { None }; let sha256 = if header.contains_metadata_ext() { let metadata_ext = FileMetadataExt::deserialize(reader)?; Some(metadata_ext.sha256) } else { None }; ret.push((header.file_hash, data_sequence_entry_byte_range, verification_entry_byte_range, sha256)); } Ok(ret) } /// Returns a list of chunk hashes for the global dedup service. /// The chunk hashes are either multiple of 'hash_filter_modulues', /// or the hash of the first chunk of a file present in the shard. pub fn filter_cas_chunks_for_global_dedup<R: Read + Seek>(reader: &mut R) -> Result<Vec<MerkleHash>> { let mut ret = Vec::new(); // First, go through and get all of the cas chunks. This allows us to form the lookup for the CAS block // hashes later. let shard = MDBShardInfo::load_from_reader(reader)?; let cas_chunks = shard.read_all_cas_blocks_full(reader)?; let mut cas_block_lookup = HashMap::<MerkleHash, usize>::with_capacity(cas_chunks.len()); for (i, cas_info) in cas_chunks.iter().enumerate() { cas_block_lookup.insert(cas_info.metadata.cas_hash, i); for chunk in cas_info.chunks.iter() { if hash_is_global_dedup_eligible(&chunk.chunk_hash) { ret.push(chunk.chunk_hash); } } } // Now, go through all the files present, collecting the first chunks of the files. // TODO: break this out into a utility if needed. let files = shard.read_all_file_info_sections(reader)?; for fi in files { let Some(entry) = fi.segments.first() else { continue; }; let Some(cas_block_index) = cas_block_lookup.get(&entry.cas_hash) else { continue; }; // Scan the cas entries to get the proper index let first_chunk_hash = cas_chunks[*cas_block_index].chunks[entry.chunk_index_start as usize].chunk_hash; ret.push(first_chunk_hash); } Ok(ret) } /// Export the current shard as an hmac keyed shard, returning the number of bytes written #[allow(clippy::too_many_arguments)] pub fn export_as_keyed_shard<R: Read + Seek, W: Write>( &self, reader: &mut R, writer: &mut W, hmac_key: HMACKey, key_valid_for: Duration, include_file_info: bool, include_cas_lookup_table: bool, include_chunk_lookup_table: bool, ) -> Result<usize> { Self::export_as_keyed_shard_impl( reader, writer, hmac_key, key_valid_for, include_file_info, include_cas_lookup_table, include_chunk_lookup_table, Some(self), ) } /// Export the current shard as an hmac keyed shard, #[allow(clippy::too_many_arguments)] pub fn export_as_keyed_shard_streaming<R: Read + Seek, W: Write>( reader: &mut R, writer: &mut W, hmac_key: HMACKey, key_valid_for: Duration, include_file_info: bool, include_cas_lookup_table: bool, include_chunk_lookup_table: bool, ) -> Result<usize> { Self::export_as_keyed_shard_impl( reader, writer, hmac_key, key_valid_for, include_file_info, include_cas_lookup_table, include_chunk_lookup_table, None, ) } /// Internal implementation of exporting the current shard as an hmac keyed shard, #[allow(clippy::too_many_arguments)] fn export_as_keyed_shard_impl<R: Read + Seek, W: Write>( reader: &mut R, writer: &mut W, hmac_key: HMACKey, key_valid_for: Duration, include_file_info: bool, include_cas_lookup_table: bool, include_chunk_lookup_table: bool, // Pass this in when we have it so we can use debug asserts for verification checking in tests. self_verification: Option<&Self>, ) -> Result<usize> { // The footer at the end that will hold each of these sections. let mut out_footer = MDBShardFileFooter::default(); // Read in the header, verifying all the information. let in_header = MDBShardFileHeader::deserialize(reader)?; // Dump out the header. let mut byte_pos = 0; byte_pos += in_header.serialize(writer)?; // Read in all the file information. out_footer.file_info_offset = byte_pos as u64; // Possibly save the lookup info here. let mut file_lookup = Vec::<(u64, u32)>::new(); // Index of entry for lookup table let mut index: u32 = 0; // materialized bytes for later storage let mut materialized_bytes = 0; loop { let file_metadata = FileDataSequenceHeader::deserialize(reader)?; if file_metadata.is_bookend() { // Serialize the bookend struct and move on. byte_pos += file_metadata.serialize(writer)?; break; } let num_entries = file_metadata.num_entries as usize; let mut n_extended_bytes = 0; if file_metadata.contains_verification() { n_extended_bytes += num_entries * size_of::<FileVerificationEntry>(); } if file_metadata.contains_metadata_ext() { n_extended_bytes += size_of::<FileMetadataExt>(); } if include_file_info { byte_pos += file_metadata.serialize(writer)?; // Need to read in the metadata values so we can calculate the materialized bytes for _ in 0..num_entries { let entry = FileDataSequenceEntry::deserialize(reader)?; materialized_bytes += entry.unpacked_segment_bytes as u64; byte_pos += entry.serialize(writer)?; } // Okay to just copy the rest of values over as there is nothing different between the two shards // up to this point. if n_extended_bytes != 0 { byte_pos += copy(&mut reader.take(n_extended_bytes as u64), writer)? as usize; } // Put in the lookup information file_lookup.push((truncate_hash(&file_metadata.file_hash), index)); index += (1 + num_entries + n_extended_bytes / MDB_FILE_INFO_ENTRY_SIZE) as u32; } else { // Discard values until the next reader break. copy(&mut reader.take(n_extended_bytes as u64), &mut std::io::sink())?; } } if let Some(self_) = self_verification { debug_assert_eq!(reader.stream_position()?, self_.metadata.cas_info_offset); } let mut cas_lookup = Vec::<(u64, u32)>::new(); let mut chunk_lookup = Vec::<(u64, (u32, u32))>::new(); // Now deal with all the cas information out_footer.cas_info_offset = byte_pos as u64; let mut cas_index = 0; let mut stored_bytes_on_disk = 0; let mut stored_bytes = 0; loop { let cas_metadata = CASChunkSequenceHeader::deserialize(reader)?; // All metadata gets serialized. byte_pos += cas_metadata.serialize(writer)?; if cas_metadata.is_bookend() { break; } if include_cas_lookup_table { cas_lookup.push((truncate_hash(&cas_metadata.cas_hash), cas_index)); } for chunk_index in 0..cas_metadata.num_entries { let mut chunk = CASChunkSequenceEntry::deserialize(reader)?; // MAke sure we don't actually put things into an unusable state. if hmac_key != HMACKey::default() { chunk.chunk_hash = chunk.chunk_hash.hmac(hmac_key); } if include_chunk_lookup_table { chunk_lookup.push((truncate_hash(&chunk.chunk_hash), (cas_index, chunk_index))); } byte_pos += chunk.serialize(writer)?; } cas_index += 1 + cas_metadata.num_entries; stored_bytes_on_disk += cas_metadata.num_bytes_on_disk as u64; stored_bytes += cas_metadata.num_bytes_in_cas as u64; } if let Some(self_) = self_verification { debug_assert_eq!(reader.stream_position()?, self_.metadata.file_lookup_offset); } // Copy over all the file lookup information if that's appropriate. out_footer.file_lookup_offset = byte_pos as u64; if include_file_info { if let Some(self_) = self_verification { debug_assert_eq!(file_lookup.len(), self_.metadata.file_lookup_num_entry as usize); } for &(key, idx) in file_lookup.iter() { write_u64(writer, key)?; write_u32(writer, idx)?; } byte_pos += file_lookup.len() * (size_of::<u64>() + size_of::<u32>()); out_footer.file_lookup_num_entry = file_lookup.len() as u64; } else { out_footer.file_lookup_num_entry = 0; } // CAS lookup section. out_footer.cas_lookup_offset = byte_pos as u64; if include_cas_lookup_table { if let Some(self_) = self_verification { debug_assert_eq!(cas_lookup.len(), self_.metadata.cas_lookup_num_entry as usize); } for &(key, idx) in cas_lookup.iter() { write_u64(writer, key)?; write_u32(writer, idx)?; } byte_pos += cas_lookup.len() * (size_of::<u64>() + size_of::<u32>()); out_footer.cas_lookup_num_entry = cas_lookup.len() as u64; } else { out_footer.cas_lookup_num_entry = 0; } out_footer.chunk_lookup_offset = byte_pos as u64; // Chunk lookup section. if include_chunk_lookup_table { // This one is different now that it's hmac keyed, so we need to rebuild it. chunk_lookup.sort_by_key(|s| s.0); for &(h, (a, b)) in chunk_lookup.iter() { write_u64(writer, h)?; write_u32(writer, a)?; write_u32(writer, b)?; } byte_pos += chunk_lookup.len() * (size_of::<u64>() + 2 * size_of::<u32>()); out_footer.chunk_lookup_num_entry = chunk_lookup.len() as u64; } else { out_footer.chunk_lookup_num_entry = 0; } out_footer.chunk_hash_hmac_key = hmac_key; // Add in the timestamps. let creation_time = std::time::SystemTime::now(); out_footer.shard_creation_timestamp = creation_time.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); out_footer.shard_key_expiry = creation_time .add(key_valid_for) .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); // Copy over the stored information elsewhere out_footer.materialized_bytes = materialized_bytes; out_footer.stored_bytes_on_disk = stored_bytes_on_disk; out_footer.stored_bytes = stored_bytes; // And we're done here! out_footer.footer_offset = byte_pos as u64; // Write out the footer at the end. byte_pos += out_footer.serialize(writer)?; // Return the number of bytes written. Ok(byte_pos) } } pub mod test_routines { use std::cmp::min; use std::io::{Cursor, Read, Seek}; use std::mem::size_of; use merklehash::MerkleHash; use rand::rngs::{SmallRng, StdRng}; use rand::{Rng, SeedableRng}; use super::FileVerificationEntry; use crate::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}; use crate::error::Result; use crate::file_structs::{FileDataSequenceEntry, FileDataSequenceHeader, FileMetadataExt, MDBFileInfo}; use crate::shard_format::MDBShardInfo; use crate::shard_in_memory::MDBInMemoryShard; use crate::streaming_shard::MDBMinimalShard; pub fn simple_hash(n: u64) -> MerkleHash { MerkleHash::from([n, 1, 0, 0]) } pub fn rng_hash(seed: u64) -> MerkleHash { let mut rng = SmallRng::seed_from_u64(seed); MerkleHash::from([rng.random(), rng.random(), rng.random(), rng.random()]) } pub fn convert_to_file(shard: &MDBInMemoryShard) -> Result<Vec<u8>> { let mut buffer = Vec::<u8>::new(); MDBShardInfo::serialize_from(&mut buffer, shard, None)?; Ok(buffer) } #[allow(clippy::type_complexity)] pub fn gen_specific_shard( cas_nodes: &[(u64, &[(u64, u32)])], file_nodes: &[(u64, &[(u64, (u32, u32))])], verifications: Option<&[&[u64]]>, metadata_exts: Option<&[u64]>, ) -> Result<MDBInMemoryShard> { let mut shard = MDBInMemoryShard::default(); for (hash, chunks) in cas_nodes { let mut cas_block = Vec::<_>::new(); let mut pos = 0u32; for (h, s) in chunks.iter() { cas_block.push(CASChunkSequenceEntry::new(simple_hash(*h), pos, *s)); pos += s } shard.add_cas_block(MDBCASInfo { metadata: CASChunkSequenceHeader::new(simple_hash(*hash), chunks.len(), pos), chunks: cas_block, })?; } if let Some(exts) = metadata_exts { assert_eq!(file_nodes.len(), exts.len()); } if let Some(verification) = verifications { assert_eq!(file_nodes.len(), verification.len()); for (i, ((file_hash, segments), verification)) in file_nodes.iter().zip(verification.iter()).enumerate() { assert_eq!(segments.len(), verification.len()); let file_contents: Vec<_> = segments .iter() .map(|(h, (lb, ub))| FileDataSequenceEntry::new(simple_hash(*h), *ub - *lb, *lb, *ub)) .collect(); let verification = verification .iter() .map(|v: &u64| FileVerificationEntry::new(simple_hash(*v))) .collect(); let metadata_ext = metadata_exts.map(|exts| FileMetadataExt::new(simple_hash(exts[i]))); shard.add_file_reconstruction_info(MDBFileInfo { metadata: FileDataSequenceHeader::new( simple_hash(*file_hash), segments.len(), true, metadata_ext.is_some(), ), segments: file_contents, verification, metadata_ext, })?; } } else { for (i, (file_hash, segments)) in file_nodes.iter().enumerate() { let file_contents: Vec<_> = segments .iter() .map(|(h, (lb, ub))| FileDataSequenceEntry::new(simple_hash(*h), *ub - *lb, *lb, *ub)) .collect(); let metadata_ext = metadata_exts.map(|exts| FileMetadataExt::new(simple_hash(exts[i]))); shard.add_file_reconstruction_info(MDBFileInfo { metadata: FileDataSequenceHeader::new( simple_hash(*file_hash), segments.len(), false, metadata_ext.is_some(), ), segments: file_contents, verification: vec![], metadata_ext, })?; } } Ok(shard) } pub fn gen_random_shard( seed: u64, cas_block_sizes: &[usize], file_chunk_range_sizes: &[usize], contains_verification: bool, contains_metadata_ext: bool, ) -> Result<MDBInMemoryShard> { gen_random_shard_impl( seed, cas_block_sizes, file_chunk_range_sizes, contains_verification, contains_metadata_ext, false, ) } pub fn gen_random_shard_with_cas_references( seed: u64, cas_block_sizes: &[usize], file_chunk_range_sizes: &[usize], contains_verification: bool, contains_metadata_ext: bool, ) -> Result<MDBInMemoryShard> { gen_random_shard_impl( seed, cas_block_sizes, file_chunk_range_sizes, contains_verification, contains_metadata_ext, true, ) } pub fn gen_random_shard_impl( seed: u64, cas_block_sizes: &[usize], file_chunk_range_sizes: &[usize], contains_verification: bool, contains_metadata_ext: bool, files_cross_reference_cas: bool, ) -> Result<MDBInMemoryShard> { // generate the cas content stuff. let mut shard = MDBInMemoryShard::default(); let mut rng = StdRng::seed_from_u64(seed); let mut cas_nodes = Vec::new(); for cas_block_size in cas_block_sizes { let mut cas_block = Vec::<_>::new(); let mut pos = 0u32; for _ in 0..*cas_block_size { cas_block.push(CASChunkSequenceEntry::new(rng_hash(rng.random()), rng.random_range(10000..20000), pos)); pos += rng.random_range(10000..20000); } let cas_block = MDBCASInfo { metadata: CASChunkSequenceHeader::new(rng_hash(rng.random()), *cas_block_size, pos), chunks: cas_block, }; if files_cross_reference_cas { cas_nodes.push(cas_block.clone()); } shard.add_cas_block(cas_block)?; } for file_block_size in file_chunk_range_sizes { let file_info = if files_cross_reference_cas { gen_random_file_info_with_cas_references( &mut rng, &cas_nodes, file_block_size, contains_verification, contains_metadata_ext, ) } else { gen_random_file_info(&mut rng, file_block_size, contains_verification, contains_metadata_ext) }; shard.add_file_reconstruction_info(file_info)?; } Ok(shard) } pub fn gen_random_file_info( rng: &mut StdRng, file_block_size: &usize, contains_verification: bool, contains_metadata_ext: bool, ) -> MDBFileInfo { let file_hash = rng_hash(rng.random()); let file_contents: Vec<_> = (0..*file_block_size) .map(|_| { let lb = rng.random_range(0..10000); let ub = lb + rng.random_range(0..10000); FileDataSequenceEntry::new(rng_hash(rng.random()), ub - lb, lb, ub) }) .collect(); let verification = if contains_verification { file_contents .iter() .map(|_| FileVerificationEntry::new(rng_hash(rng.random()))) .collect() } else { vec![] }; let metadata_ext = contains_metadata_ext.then(|| rng_hash(rng.random())).map(FileMetadataExt::new); MDBFileInfo { metadata: FileDataSequenceHeader::new( file_hash, *file_block_size, contains_verification, metadata_ext.is_some(), ), segments: file_contents, verification, metadata_ext, } } pub fn gen_random_file_info_with_cas_references( rng: &mut StdRng, cas_nodes: &[MDBCASInfo], file_block_size: &usize, contains_verification: bool, contains_metadata_ext: bool, ) -> MDBFileInfo { let file_hash = rng_hash(rng.random()); // Not verified at the moment. let file_contents: Vec<_> = (0..*file_block_size) .map(|_| { let cas_idx = rng.random_range(0..cas_nodes.len()); let cas_block = &cas_nodes[cas_idx]; let start_idx = rng.random_range(0..cas_block.chunks.len()); let end_idx = rng.random_range((start_idx + 1)..=cas_nodes[cas_idx].chunks.len()); FileDataSequenceEntry::new( cas_block.metadata.cas_hash, cas_block.chunks[start_idx..end_idx] .iter() .map(|c| c.unpacked_segment_bytes) .sum(), start_idx as u32, end_idx as u32, ) }) .collect(); let verification = if contains_verification { file_contents .iter() .map(|_| FileVerificationEntry::new(rng_hash(rng.random()))) .collect() } else { vec![] }; let metadata_ext = contains_metadata_ext.then(|| rng_hash(rng.random())).map(FileMetadataExt::new); MDBFileInfo { metadata: FileDataSequenceHeader::new( file_hash, *file_block_size, contains_verification, metadata_ext.is_some(), ), segments: file_contents, verification, metadata_ext, } } pub fn verify_mdb_shard(shard: &MDBInMemoryShard) -> Result<()> { let buffer = convert_to_file(shard)?; verify_mdb_shards_match(shard, Cursor::new(&buffer)) } pub fn verify_mdb_shards_match<R: Read + Seek>(mem_shard: &MDBInMemoryShard, shard_info: R) -> Result<()> { let mut cursor = shard_info; // Now, test that the results on queries from the let shard_file = MDBShardInfo::load_from_reader(&mut cursor)?; // Test on the minimal shard format as well cursor.rewind()?; let min_shard = MDBMinimalShard::from_reader(&mut cursor, true, true).unwrap(); let mem_size = mem_shard.shard_file_size(); let disk_size = shard_file.num_bytes(); assert_eq!(mem_size, disk_size); assert_eq!(mem_shard.materialized_bytes(), shard_file.materialized_bytes()); assert_eq!(mem_shard.stored_bytes(), shard_file.stored_bytes()); for (k, cas_block) in mem_shard.cas_content.iter() { // Go through and test queries on both the in-memory shard and the // serialized shard, making sure that they match completely. for i in 0..cas_block.chunks.len() { // Test the dedup query over a few hashes in which all the // hashes queried are part of the cas_block. let query_hashes_1: Vec<MerkleHash> = cas_block.chunks[i..(i + 3).min(cas_block.chunks.len())] .iter() .map(|c| c.chunk_hash) .collect(); let n_items_to_read = query_hashes_1.len(); // Also test the dedup query over a few hashes in which some of the // hashes are part of the query, and the last is not. let mut query_hashes_2 = query_hashes_1.clone(); query_hashes_2.push(rng_hash(1000000 + i as u64)); let lb = i as u32; let ub = min(i + 3, cas_block.chunks.len()) as u32; for query_hashes in [&query_hashes_1, &query_hashes_2] { let result_m = mem_shard.chunk_hash_dedup_query(query_hashes).unwrap(); let result_f = shard_file.chunk_hash_dedup_query(&mut cursor, query_hashes)?.unwrap(); // Returns a tuple of (num chunks matched, FileDataSequenceEntry) assert_eq!(result_m.0, n_items_to_read); assert_eq!(result_f.0, n_items_to_read); // Make sure it gives the correct CAS block hash as the second part of the assert_eq!(result_m.1.cas_hash, *k); assert_eq!(result_f.1.cas_hash, *k); // Make sure the bounds are correct assert_eq!((result_m.1.chunk_index_start, result_m.1.chunk_index_end), (lb, ub)); assert_eq!((result_f.1.chunk_index_start, result_f.1.chunk_index_end), (lb, ub)); // Make sure everything else equal. assert_eq!(result_m, result_f); } } } // Test get file reconstruction info. // Against some valid hashes, let mut query_hashes: Vec<MerkleHash> = mem_shard.file_content.iter().map(|file| *file.0).collect(); // and a few (very likely) invalid somes. for i in 0..3 { query_hashes.push(rng_hash(1000000 + i as u64)); } for k in query_hashes.iter() { let result_m = mem_shard.get_file_reconstruction_info(k); let result_f = shard_file.get_file_reconstruction_info(&mut cursor, k)?; // Make sure two queries return same results. assert_eq!(result_m, result_f); // Make sure retriving the expected file. if result_m.is_some() { assert_eq!(result_m.unwrap().metadata.file_hash, *k); assert_eq!(result_f.unwrap().metadata.file_hash, *k); } } // Make sure the cas blocks and chunks are correct. let cas_blocks_full = shard_file.read_all_cas_blocks_full(&mut cursor)?; // Make sure the cas blocks are correct let cas_blocks = shard_file.read_all_cas_blocks(&mut cursor)?; assert_eq!(cas_blocks.len(), min_shard.num_cas()); for (i, (cas_block, pos)) in cas_blocks.into_iter().enumerate() { let cas = mem_shard.cas_content.get(&cas_block.cas_hash).unwrap(); assert_eq!(cas_block.num_entries, cas.chunks.len() as u32); cursor.seek(std::io::SeekFrom::Start(pos))?; let read_cas = MDBCASInfo::deserialize(&mut cursor)?.unwrap(); assert_eq!(read_cas.metadata, cas_block); assert_eq!(&read_cas, cas.as_ref()); assert_eq!(&cas_blocks_full[i], cas.as_ref()); let m_cas_chunk = min_shard.cas(i).unwrap(); assert_eq!(m_cas_chunk.header(), &cas_blocks_full[i].metadata); assert_eq!(m_cas_chunk.num_entries(), cas_blocks_full[i].chunks.len()); for j in 0..m_cas_chunk.num_entries() { assert_eq!(cas_blocks_full[i].chunks[j], m_cas_chunk.chunk(j)); } } // Make sure the file info section is good { cursor.seek(std::io::SeekFrom::Start(0))?; let file_info = MDBShardInfo::read_file_info_ranges(&mut cursor)?; assert_eq!(file_info.len(), mem_shard.file_content.len()); assert_eq!(file_info.len(), min_shard.num_files()); for (i, (file_hash, data_byte_range, verification_byte_range, _metadata_ext_byte_range)) in file_info.into_iter().enumerate() { let true_fi = mem_shard.file_content.get(&file_hash).unwrap(); // Check FileDataSequenceEntry let (byte_start, byte_end) = data_byte_range; cursor.seek(std::io::SeekFrom::Start(byte_start))?; let num_entries = (byte_end - byte_start) / (size_of::<FileDataSequenceEntry>() as u64); // No leftovers assert_eq!(num_entries * (size_of::<FileDataSequenceEntry>() as u64), byte_end - byte_start); assert_eq!(num_entries, true_fi.segments.len() as u64); // Minimal view is good let m_file_info = min_shard.file(i).unwrap(); assert_eq!(m_file_info.header(), &true_fi.metadata); assert_eq!(m_file_info.num_entries(), true_fi.segments.len()); for j in 0..true_fi.metadata.num_entries as usize { let pos = byte_start + (j * size_of::<FileDataSequenceEntry>()) as u64; cursor.seek(std::io::SeekFrom::Start(pos))?; let fie = FileDataSequenceEntry::deserialize(&mut cursor)?; assert_eq!(true_fi.segments[j], fie); assert_eq!(m_file_info.entry(j), true_fi.segments[j]) } // Check FileVerificationEntry if exists if let Some((byte_start, byte_end)) = verification_byte_range { cursor.seek(std::io::SeekFrom::Start(byte_start))?; let num_entries = (byte_end - byte_start) / (size_of::<FileVerificationEntry>() as u64); // No leftovers assert_eq!(num_entries * (size_of::<FileVerificationEntry>() as u64), byte_end - byte_start); assert_eq!(num_entries, true_fi.verification.len() as u64); for j in 0..true_fi.metadata.num_entries as usize { let pos = byte_start + (j * size_of::<FileVerificationEntry>()) as u64; cursor.seek(std::io::SeekFrom::Start(pos))?; let fie = FileVerificationEntry::deserialize(&mut cursor)?; assert_eq!(true_fi.verification[j], fie); assert_eq!(true_fi.verification[j], m_file_info.verification(j)); } } } } Ok(()) } } #[cfg(test)] mod tests { use super::test_routines::*; use crate::error::Result; #[test] fn test_simple() -> Result<()> { let shard = gen_random_shard(0, &[1, 1], &[1], false, false)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[1, 1], &[1], true, false)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[1, 1], &[1], false, true)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[1, 1], &[1], true, true)?; verify_mdb_shard(&shard)?; Ok(()) } #[test] fn test_specific() -> Result<()> { let mem_shard_1 = gen_specific_shard(&[(0, &[(11, 5)])], &[(100, &[(200, (0, 5))])], None, None)?; verify_mdb_shard(&mem_shard_1)?; let mem_shard_1 = gen_specific_shard(&[(0, &[(11, 5)])], &[(100, &[(200, (0, 5))])], Some(&[&[25]]), None)?; verify_mdb_shard(&mem_shard_1)?; let mem_shard_1 = gen_specific_shard(&[(0, &[(11, 5)])], &[(100, &[(200, (0, 5))])], None, Some(&[38]))?; verify_mdb_shard(&mem_shard_1)?; let mem_shard_1 = gen_specific_shard(&[(0, &[(11, 5)])], &[(100, &[(200, (0, 5))])], Some(&[&[25]]), Some(&[38]))?; verify_mdb_shard(&mem_shard_1)?; Ok(()) } #[test] fn test_multiple() -> Result<()> { let shard = gen_random_shard(0, &[1], &[1, 1], false, false)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], false, false)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], true, false)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], false, true)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], true, true)?; verify_mdb_shard(&shard)?; Ok(()) } #[test] fn test_corner_cases_empty() -> Result<()> { let shard = gen_random_shard(0, &[0], &[0], false, false)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[0], &[0], true, false)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[0], &[0], false, true)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[0], &[0], true, true)?; verify_mdb_shard(&shard)?; Ok(()) } #[test] fn test_corner_cases_empty_entries() -> Result<()> { let shard = gen_random_shard(0, &[5, 6, 0, 10, 0], &[3, 4, 5, 0, 4, 0], false, false)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[5, 6, 0, 10, 0], &[3, 4, 5, 0, 4, 0], true, false)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[5, 6, 0, 10, 0], &[3, 4, 5, 0, 4, 0], false, true)?; verify_mdb_shard(&shard)?; let shard = gen_random_shard(0, &[5, 6, 0, 10, 0], &[3, 4, 5, 0, 4, 0], true, true)?; verify_mdb_shard(&shard)?; Ok(()) } }