in below/store/src/lib.rs [445:575]
fn put_in_current_shard(&mut self, timestamp: SystemTime, data: &DataFrame) -> Result<()> {
let shard = calculate_shard(timestamp);
if shard != self.shard {
panic!("Can't write data to shard as it belongs to different shard")
}
// PO2 chunk size in bytes if dict compression is used, otherwise 0.
let chunk_alignment_po2 =
if let CompressionMode::ZstdDictionary(ChunkSizePo2(chunk_size_po2)) =
self.compression_mode
{
// chunk_size_po2 is in number of entries. Add with entry size
// po2 to get size in bytes po2.
chunk_size_po2 + INDEX_ENTRY_SIZE_PO2
} else {
0
};
// If dict compression is used but Compressor uninitialized, e.g. new
// shard, previous write failed, then pad index to start a new chunk.
// Otherwise pad to ensure index file is aligned with INDEX_ENTRY_SIZE.
let alignment_po2 = if chunk_alignment_po2 != 0 && self.compressor.is_none() {
chunk_alignment_po2
} else {
INDEX_ENTRY_SIZE_PO2
};
let (index_len, aligned_len) = Self::pad_and_get_index_len(&mut self.index, alignment_po2)
.with_context(|| {
format!(
"Failed to get index length and possibly pad index file: index_{:011}",
shard
)
})?;
if index_len != aligned_len {
if alignment_po2 == INDEX_ENTRY_SIZE_PO2 {
warn!(
self.logger,
"Index length not a multiple of fixed index entry size: {}. Padded to size: {}",
index_len,
aligned_len,
);
} else if alignment_po2 == chunk_alignment_po2 {
// Always happen when below restarts. Thus log with info level
info!(
self.logger,
"Padded index so that first entry of block is aligned. Previous len: {}. New len: {}",
index_len,
aligned_len,
);
} else {
panic!("Unexpected alignment_po2 value");
}
}
// Take the compressor from self before modifying it. In case any write
// failure occurs, the old compressor (potentially in bad state) will be
// discarded and a new one be created in the next write. No-op if
// compression is not used.
let mut compressor = self.compressor.take();
// If dict compression is used and the index file is chunk aligned, the
// current frame is the key frame.
let is_key_frame =
chunk_alignment_po2 != 0 && aligned_len.trailing_zeros() >= chunk_alignment_po2;
let (serialized, flags) = self
.get_bytes_and_flags_for_frame(data, &mut compressor, is_key_frame)
.context("Failed to get serialized frame and flags")?;
// Appends to data file are large and cannot be atomic. We
// may have partial writes that increase file size without
// updating the stored state. Thus always read actual data
// file length. This is less of an issue for the index file
// but we track it anyway.
let data_len = self
.data
.metadata()
.with_context(|| {
format!(
"Failed to get metadata of data file: data_{:011}",
self.shard
)
})?
.len();
// Warn potential data file corruption
if self.data_len != data_len {
warn!(
self.logger,
"Data length mismatch: {} (expect {})", data_len, self.data_len
);
self.data_len = data_len;
}
let offset = self.data_len;
// It doesn't really matter which order we write the data in,
// most filesystems do not provide ordering guarantees for
// appends to different files anyways. We just need to handle
// various failure cases on the read side.
self.data
.write_all(&serialized)
.context("Failed to write entry to data file")?;
self.data_len += serialized.len() as u64;
let data_crc = serialized.crc32();
let mut index_entry = IndexEntry {
timestamp: get_unix_timestamp(timestamp),
offset,
flags,
len: serialized
.len()
.try_into()
.with_context(|| format!("Serialized len={} overflows u32", serialized.len()))?,
data_crc,
index_crc: 0,
};
index_entry.index_crc = index_entry.crc32();
{
// unsafe to turn this into a slice - we need this to write it though
let entry_slice = unsafe {
std::slice::from_raw_parts(
&index_entry as *const IndexEntry as *const u8,
INDEX_ENTRY_SIZE,
)
};
self.index
.write_all(entry_slice)
.context("Failed to write entry to index file")?;
}
// Set compressor only after successful writes. No-op if not in
// compression mode
self.compressor = compressor;
Ok(())
}