in below/store/src/lib.rs [1163:1259]
fn _put_new_shard(compression_mode: CompressionMode, format: Format) {
let dir = TempDir::new("below_store_test").expect("tempdir failed");
let now = SystemTime::now();
// Ensure that the follow writes (within 60s) are to the same shard
let ts = if calculate_shard(now) == calculate_shard(now + Duration::from_secs(60)) {
now
} else {
now + Duration::from_secs(60)
};
{
let mut writer =
StoreWriter::new_with_timestamp(get_logger(), &dir, ts, compression_mode, format)
.expect("Failed to create store");
let mut frame = DataFrame::default();
frame.sample.cgroup.memory_current = Some(111);
// New StoreWriter, but we're not switching to new shard
assert!(!writer.put(ts, &frame).expect("Failed to store data"));
frame.sample.cgroup.memory_current = Some(222);
// No new shard
assert!(
!writer
.put(ts + Duration::from_secs(1), &frame)
.expect("Failed to store data")
);
frame.sample.cgroup.memory_current = Some(333);
// New shard
assert!(
writer
.put(ts + Duration::from_secs(SHARD_TIME), &frame)
.expect("Failed to store data")
);
}
{
let mut writer = StoreWriter::new_with_timestamp(
get_logger(),
&dir,
ts + Duration::from_secs(SHARD_TIME + 1),
compression_mode,
format,
)
.expect("Failed to create store");
let mut frame = DataFrame::default();
frame.sample.cgroup.memory_current = Some(444);
// New StoreWriter but writing to existing shard
assert!(
!writer
.put(ts + Duration::from_secs(SHARD_TIME + 1), &frame,)
.expect("Failed to store data")
);
}
let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
let frame = store_cursor
.get_next(&get_unix_timestamp(ts), Direction::Forward)
.expect("Failed to read sample")
.expect("Did not find stored sample");
assert_ts!(frame.0, ts);
assert_eq!(frame.1.sample.cgroup.memory_current, Some(111));
let frame = store_cursor
.get_next(
&get_unix_timestamp(ts + Duration::from_secs(1)),
Direction::Forward,
)
.expect("Failed to read sample")
.expect("Did not find stored sample");
assert_ts!(frame.0, ts + Duration::from_secs(1));
assert_eq!(frame.1.sample.cgroup.memory_current, Some(222));
let frame = store_cursor
.get_next(
&get_unix_timestamp(ts + Duration::from_secs(SHARD_TIME)),
Direction::Forward,
)
.expect("Failed to read sample")
.expect("Did not find stored sample");
assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME));
assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
let frame = store_cursor
.get_next(
&get_unix_timestamp(ts + Duration::from_secs(SHARD_TIME + 1)),
Direction::Forward,
)
.expect("Failed to read sample")
.expect("Did not find stored sample");
assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME + 1));
assert_eq!(frame.1.sample.cgroup.memory_current, Some(444));
}