fn put_in_current_shard()

in below/store/src/lib.rs [445:575]


    fn put_in_current_shard(&mut self, timestamp: SystemTime, data: &DataFrame) -> Result<()> {
        let shard = calculate_shard(timestamp);
        if shard != self.shard {
            panic!("Can't write data to shard as it belongs to different shard")
        }

        // PO2 chunk size in bytes if dict compression is used, otherwise 0.
        let chunk_alignment_po2 =
            if let CompressionMode::ZstdDictionary(ChunkSizePo2(chunk_size_po2)) =
                self.compression_mode
            {
                // chunk_size_po2 is in number of entries. Add with entry size
                // po2 to get size in bytes po2.
                chunk_size_po2 + INDEX_ENTRY_SIZE_PO2
            } else {
                0
            };
        // If dict compression is used but Compressor uninitialized, e.g. new
        // shard, previous write failed, then pad index to start a new chunk.
        // Otherwise pad to ensure index file is aligned with INDEX_ENTRY_SIZE.
        let alignment_po2 = if chunk_alignment_po2 != 0 && self.compressor.is_none() {
            chunk_alignment_po2
        } else {
            INDEX_ENTRY_SIZE_PO2
        };
        let (index_len, aligned_len) = Self::pad_and_get_index_len(&mut self.index, alignment_po2)
            .with_context(|| {
                format!(
                    "Failed to get index length and possibly pad index file: index_{:011}",
                    shard
                )
            })?;
        if index_len != aligned_len {
            if alignment_po2 == INDEX_ENTRY_SIZE_PO2 {
                warn!(
                    self.logger,
                    "Index length not a multiple of fixed index entry size: {}. Padded to size: {}",
                    index_len,
                    aligned_len,
                );
            } else if alignment_po2 == chunk_alignment_po2 {
                // Always happen when below restarts. Thus log with info level
                info!(
                    self.logger,
                    "Padded index so that first entry of block is aligned. Previous len: {}. New len: {}",
                    index_len,
                    aligned_len,
                );
            } else {
                panic!("Unexpected alignment_po2 value");
            }
        }

        // Take the compressor from self before modifying it. In case any write
        // failure occurs, the old compressor (potentially in bad state) will be
        // discarded and a new one be created in the next write. No-op if
        // compression is not used.
        let mut compressor = self.compressor.take();
        // If dict compression is used and the index file is chunk aligned, the
        // current frame is the key frame.
        let is_key_frame =
            chunk_alignment_po2 != 0 && aligned_len.trailing_zeros() >= chunk_alignment_po2;
        let (serialized, flags) = self
            .get_bytes_and_flags_for_frame(data, &mut compressor, is_key_frame)
            .context("Failed to get serialized frame and flags")?;

        // Appends to data file are large and cannot be atomic. We
        // may have partial writes that increase file size without
        // updating the stored state. Thus always read actual data
        // file length. This is less of an issue for the index file
        // but we track it anyway.
        let data_len = self
            .data
            .metadata()
            .with_context(|| {
                format!(
                    "Failed to get metadata of data file: data_{:011}",
                    self.shard
                )
            })?
            .len();
        // Warn potential data file corruption
        if self.data_len != data_len {
            warn!(
                self.logger,
                "Data length mismatch: {} (expect {})", data_len, self.data_len
            );
            self.data_len = data_len;
        }

        let offset = self.data_len;
        // It doesn't really matter which order we write the data in,
        // most filesystems do not provide ordering guarantees for
        // appends to different files anyways. We just need to handle
        // various failure cases on the read side.
        self.data
            .write_all(&serialized)
            .context("Failed to write entry to data file")?;
        self.data_len += serialized.len() as u64;
        let data_crc = serialized.crc32();

        let mut index_entry = IndexEntry {
            timestamp: get_unix_timestamp(timestamp),
            offset,
            flags,
            len: serialized
                .len()
                .try_into()
                .with_context(|| format!("Serialized len={} overflows u32", serialized.len()))?,
            data_crc,
            index_crc: 0,
        };
        index_entry.index_crc = index_entry.crc32();
        {
            // unsafe to turn this into a slice - we need this to write it though
            let entry_slice = unsafe {
                std::slice::from_raw_parts(
                    &index_entry as *const IndexEntry as *const u8,
                    INDEX_ENTRY_SIZE,
                )
            };
            self.index
                .write_all(entry_slice)
                .context("Failed to write entry to index file")?;
        }

        // Set compressor only after successful writes. No-op if not in
        // compression mode
        self.compressor = compressor;
        Ok(())
    }