in below/store/src/lib.rs [290:365]
fn new_with_shard<P: AsRef<Path>>(
logger: slog::Logger,
path: P,
shard: u64,
compression_mode: CompressionMode,
format: Format,
) -> Result<Self> {
if !path.as_ref().is_dir() {
std::fs::create_dir(&path).with_context(|| {
format!("Failed to create store path: {}", path.as_ref().display())
})?;
}
let (data_path, index_path) = {
let mut data_path = path.as_ref().to_path_buf();
let mut index_path = data_path.clone();
data_path.push(format!("data_{:011}", shard));
index_path.push(format!("index_{:011}", shard));
(data_path, index_path)
};
let index = OpenOptions::new()
.append(true)
.create(true)
.open(index_path.as_path())
.with_context(|| format!("Failed to open index file: {}", index_path.display()))?;
nix::fcntl::flock(
index.as_raw_fd(),
nix::fcntl::FlockArg::LockExclusiveNonblock,
)
.with_context(|| {
format!(
"Failed to acquire file lock on index file: {}",
index_path.display(),
)
})?;
let data = OpenOptions::new()
.append(true)
.create(true)
.open(data_path.as_path())
.with_context(|| format!("Failed to open data file: {}", data_path.display()))?;
nix::fcntl::flock(
data.as_raw_fd(),
nix::fcntl::FlockArg::LockExclusiveNonblock,
)
.with_context(|| {
format!(
"Failed to acquire file lock on data file: {}",
data_path.display(),
)
})?;
let data_len = data
.metadata()
.with_context(|| {
format!(
"Failed to get metadata of data file: {}",
data_path.display()
)
})?
.len();
Ok(StoreWriter {
logger,
dir: path.as_ref().to_path_buf(),
index,
data,
data_len,
shard,
// First compressed write initializes the compressor
compressor: None,
compression_mode,
format,
})
}