in below/store/src/lib.rs [1494:1614]
fn _try_discard_until_size(compression_mode: CompressionMode, format: Format) {
let dir = TempDir::new("below_store_test").expect("tempdir failed");
let dir_path_buf = dir.path().to_path_buf();
let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
let mut shard_sizes = Vec::new();
let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
.expect("Failed to create store");
// Write n samples from timestamp 1 seconds apart, returning size
// increase of the store directory.
let mut write = |timestamp: SystemTime, n: u64| -> u64 {
let dir_size = get_dir_size(dir_path_buf.clone());
let mut frame = DataFrame::default();
for i in 0..n {
frame.sample.cgroup.memory_current = Some(n as i64 + i as i64);
writer
.put(timestamp + Duration::from_secs(i as u64), &frame)
.expect("Failed to store data");
}
let dir_size_after = get_dir_size(dir_path_buf.clone());
assert!(
dir_size_after > dir_size,
"Directory size did not increase. before: {} after: {}: n_samples {}",
dir_size,
dir_size_after,
n,
);
return dir_size_after - dir_size;
};
let num_shards = 7;
for i in 0..num_shards {
shard_sizes.push(write(ts + Duration::from_secs(SHARD_TIME * i), i + 1));
}
let total_size = shard_sizes.iter().sum::<u64>();
// In the following tests, we use new instances of StoreCursor so that
// it doesn't continue using the mmap of current files.
{
// Nothing is discarded
let target_size = total_size;
assert!(
writer
.try_discard_until_size(target_size)
.expect("Failed to discard data")
);
let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
.get_next(&get_unix_timestamp(ts), Direction::Forward)
.expect("Failed to read sample")
.expect("Did not find stored sample");
assert_ts!(frame.0, ts);
assert_eq!(frame.1.sample.cgroup.memory_current, Some(1));
}
{
// Delete first shard
let target_size = total_size - 1;
assert!(
writer
.try_discard_until_size(target_size)
.expect("Failed to discard data")
);
let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
.get_next(&get_unix_timestamp(ts), Direction::Forward)
.expect("Failed to read sample")
.expect("Did not find stored sample");
// assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME));
assert_eq!(frame.1.sample.cgroup.memory_current, Some(2));
}
{
// Delete second and third shards
let target_size = total_size - (shard_sizes[0] + shard_sizes[1] + shard_sizes[2]);
assert!(
writer
.try_discard_until_size(target_size)
.expect("Failed to discard data")
);
let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
.get_next(&get_unix_timestamp(ts), Direction::Forward)
.expect("Failed to read sample")
.expect("Did not find stored sample");
assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME * 3));
assert_eq!(frame.1.sample.cgroup.memory_current, Some(4));
}
{
// Delete fourth and fifth shards, with a target directory size
// slightly greater than the resulting size directory size
let target_size = total_size - (shard_sizes[0] + shard_sizes[1] + shard_sizes[2] +
shard_sizes[3] + shard_sizes[4])
+ /* smaller than a shard */ 1;
assert!(
writer
.try_discard_until_size(target_size)
.expect("Failed to discard data")
);
let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
.get_next(&get_unix_timestamp(ts), Direction::Forward)
.expect("Failed to read sample")
.expect("Did not find stored sample");
assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME * 5));
assert_eq!(frame.1.sample.cgroup.memory_current, Some(6));
}
{
// Delete until size is 1. Verify that the current shard remains
// (i.e. size > 1).
assert!(
!writer
.try_discard_until_size(1)
.expect("Failed to discard data"),
);
let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
.get_next(&get_unix_timestamp(ts), Direction::Forward)
.expect("Failed to read sample")
.expect("Did not find stored sample");
assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME) * 6);
assert_eq!(frame.1.sample.cgroup.memory_current, Some(7));
}
}