fn export_as_keyed_shard_impl()

in mdb_shard/src/shard_format.rs [1022:1240]


    fn export_as_keyed_shard_impl<R: Read + Seek, W: Write>(
        reader: &mut R,
        writer: &mut W,
        hmac_key: HMACKey,
        key_valid_for: Duration,
        include_file_info: bool,
        include_cas_lookup_table: bool,
        include_chunk_lookup_table: bool,
        // Pass this in when we have it so we can use debug asserts for verification checking in tests.
        self_verification: Option<&Self>,
    ) -> Result<usize> {
        // The footer at the end that will hold each of these sections.
        let mut out_footer = MDBShardFileFooter::default();

        // Read in the header, verifying all the information.
        let in_header = MDBShardFileHeader::deserialize(reader)?;

        // Dump out the header.
        let mut byte_pos = 0;
        byte_pos += in_header.serialize(writer)?;

        // Read in all the file information.
        out_footer.file_info_offset = byte_pos as u64;

        // Possibly save the lookup info here.
        let mut file_lookup = Vec::<(u64, u32)>::new();

        // Index of entry for lookup table
        let mut index: u32 = 0;

        // materialized bytes for later storage

        let mut materialized_bytes = 0;

        loop {
            let file_metadata = FileDataSequenceHeader::deserialize(reader)?;

            if file_metadata.is_bookend() {
                // Serialize the bookend struct and move on.
                byte_pos += file_metadata.serialize(writer)?;
                break;
            }

            let num_entries = file_metadata.num_entries as usize;

            let mut n_extended_bytes = 0;

            if file_metadata.contains_verification() {
                n_extended_bytes += num_entries * size_of::<FileVerificationEntry>();
            }

            if file_metadata.contains_metadata_ext() {
                n_extended_bytes += size_of::<FileMetadataExt>();
            }

            if include_file_info {
                byte_pos += file_metadata.serialize(writer)?;

                // Need to read in the metadata values so we can calculate the materialized bytes
                for _ in 0..num_entries {
                    let entry = FileDataSequenceEntry::deserialize(reader)?;
                    materialized_bytes += entry.unpacked_segment_bytes as u64;
                    byte_pos += entry.serialize(writer)?;
                }

                // Okay to just copy the rest of values over as there is nothing different between the two shards
                // up to this point.
                if n_extended_bytes != 0 {
                    byte_pos += copy(&mut reader.take(n_extended_bytes as u64), writer)? as usize;
                }

                // Put in the lookup information
                file_lookup.push((truncate_hash(&file_metadata.file_hash), index));
                index += (1 + num_entries + n_extended_bytes / MDB_FILE_INFO_ENTRY_SIZE) as u32;
            } else {
                // Discard values until the next reader break.
                copy(&mut reader.take(n_extended_bytes as u64), &mut std::io::sink())?;
            }
        }

        if let Some(self_) = self_verification {
            debug_assert_eq!(reader.stream_position()?, self_.metadata.cas_info_offset);
        }

        let mut cas_lookup = Vec::<(u64, u32)>::new();
        let mut chunk_lookup = Vec::<(u64, (u32, u32))>::new();

        // Now deal with all the cas information
        out_footer.cas_info_offset = byte_pos as u64;

        let mut cas_index = 0;
        let mut stored_bytes_on_disk = 0;
        let mut stored_bytes = 0;

        loop {
            let cas_metadata = CASChunkSequenceHeader::deserialize(reader)?;

            // All metadata gets serialized.
            byte_pos += cas_metadata.serialize(writer)?;

            if cas_metadata.is_bookend() {
                break;
            }

            if include_cas_lookup_table {
                cas_lookup.push((truncate_hash(&cas_metadata.cas_hash), cas_index));
            }

            for chunk_index in 0..cas_metadata.num_entries {
                let mut chunk = CASChunkSequenceEntry::deserialize(reader)?;

                // MAke sure we don't actually put things into an unusable state.
                if hmac_key != HMACKey::default() {
                    chunk.chunk_hash = chunk.chunk_hash.hmac(hmac_key);
                }

                if include_chunk_lookup_table {
                    chunk_lookup.push((truncate_hash(&chunk.chunk_hash), (cas_index, chunk_index)));
                }

                byte_pos += chunk.serialize(writer)?;
            }

            cas_index += 1 + cas_metadata.num_entries;
            stored_bytes_on_disk += cas_metadata.num_bytes_on_disk as u64;
            stored_bytes += cas_metadata.num_bytes_in_cas as u64;
        }

        if let Some(self_) = self_verification {
            debug_assert_eq!(reader.stream_position()?, self_.metadata.file_lookup_offset);
        }

        // Copy over all the file lookup information if that's appropriate.
        out_footer.file_lookup_offset = byte_pos as u64;

        if include_file_info {
            if let Some(self_) = self_verification {
                debug_assert_eq!(file_lookup.len(), self_.metadata.file_lookup_num_entry as usize);
            }

            for &(key, idx) in file_lookup.iter() {
                write_u64(writer, key)?;
                write_u32(writer, idx)?;
            }

            byte_pos += file_lookup.len() * (size_of::<u64>() + size_of::<u32>());

            out_footer.file_lookup_num_entry = file_lookup.len() as u64;
        } else {
            out_footer.file_lookup_num_entry = 0;
        }

        // CAS lookup section.
        out_footer.cas_lookup_offset = byte_pos as u64;

        if include_cas_lookup_table {
            if let Some(self_) = self_verification {
                debug_assert_eq!(cas_lookup.len(), self_.metadata.cas_lookup_num_entry as usize);
            }

            for &(key, idx) in cas_lookup.iter() {
                write_u64(writer, key)?;
                write_u32(writer, idx)?;
            }

            byte_pos += cas_lookup.len() * (size_of::<u64>() + size_of::<u32>());

            out_footer.cas_lookup_num_entry = cas_lookup.len() as u64;
        } else {
            out_footer.cas_lookup_num_entry = 0;
        }

        out_footer.chunk_lookup_offset = byte_pos as u64;

        // Chunk lookup section.
        if include_chunk_lookup_table {
            // This one is different now that it's hmac keyed, so we need to rebuild it.
            chunk_lookup.sort_by_key(|s| s.0);

            for &(h, (a, b)) in chunk_lookup.iter() {
                write_u64(writer, h)?;
                write_u32(writer, a)?;
                write_u32(writer, b)?;
            }

            byte_pos += chunk_lookup.len() * (size_of::<u64>() + 2 * size_of::<u32>());

            out_footer.chunk_lookup_num_entry = chunk_lookup.len() as u64;
        } else {
            out_footer.chunk_lookup_num_entry = 0;
        }

        out_footer.chunk_hash_hmac_key = hmac_key;

        // Add in the timestamps.
        let creation_time = std::time::SystemTime::now();

        out_footer.shard_creation_timestamp = creation_time.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();

        out_footer.shard_key_expiry = creation_time
            .add(key_valid_for)
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();

        // Copy over the stored information elsewhere
        out_footer.materialized_bytes = materialized_bytes;
        out_footer.stored_bytes_on_disk = stored_bytes_on_disk;
        out_footer.stored_bytes = stored_bytes;

        // And we're done here!
        out_footer.footer_offset = byte_pos as u64;

        // Write out the footer at the end.
        byte_pos += out_footer.serialize(writer)?;

        // Return the number of bytes written.
        Ok(byte_pos)
    }