in src/storage/store/ChunkMetaStore.cc [427:535]
Result<Void> ChunkMetaStore::createChunk(const ChunkId &chunkId,
ChunkMetadata &meta,
uint32_t chunkSize,
folly::CPUThreadPoolExecutor &executor,
bool allowToAllocate) {
auto recordGuard = storageMetaCreate.record();
auto stateResult = loadAllocateState(chunkSize);
RETURN_AND_LOG_ON_ERROR(stateResult);
auto &state = **stateResult;
ChunkPosition pos;
bool useRecycledChunk = false;
auto batchOp = kv_->createBatchOps();
{
auto startTime = RelativeTime::now();
auto lock = std::unique_lock(state.createMutex);
storageMetaCreateWait.addSample(RelativeTime::now() - startTime);
if (needRecycleRemovedChunks(state)) {
if (!state.recycling.exchange(true)) {
executor.add([this, &state] { recycleRemovedChunks(state); });
}
if (state.recycledChunks.empty()) {
lock.unlock();
auto recycleLock = std::unique_lock(state.recycleMutex);
lock.lock();
if (state.recycledChunks.empty()) {
RETURN_AND_LOG_ON_ERROR(recycleRemovedChunks(state, true));
}
}
}
useRecycledChunk = !state.recycledChunks.empty();
if (useRecycledChunk) {
// 2.1 use recycled chunks.
pos = state.recycledChunks.back();
state.recycledChunks.pop_back();
batchOp->put(serializeKey(ReusedCountKey{state.chunkSize}), serde::serializeBytes(++state.reusedCount));
RecycledKey recycledKey;
recycledKey.chunkSize = chunkSize;
recycledKey.pos = pos;
batchOp->remove(serializeKey(recycledKey));
} else {
if (!allowToAllocate) {
auto msg = fmt::format("chunk {} create new chunk write limit", chunkId);
XLOG(ERR, msg);
return makeError(StorageClientCode::kNoSpace, std::move(msg));
}
// 2.2 use created chunks.
if (state.createdChunks.size() * state.chunkSize <= config_.allocate_size() / 2) {
if (!state.allocating.exchange(true)) {
executor.add([this, &state] { allocateChunks(state); });
}
}
if (state.createdChunks.empty()) {
lock.unlock();
auto allocateLock = std::unique_lock(state.allocateMutex);
lock.lock();
if (state.createdChunks.empty()) {
RETURN_AND_LOG_ON_ERROR(allocateChunks(state, true));
}
}
pos = state.createdChunks.back();
state.createdChunks.pop_back();
batchOp->put(serializeKey(UsedCountKey{state.chunkSize}), serde::serializeBytes(++state.usedCount));
CreatedKey extendKey;
extendKey.chunkSize = chunkSize;
extendKey.pos = pos;
batchOp->remove(serializeKey(extendKey));
}
}
// 3. insert metadata.
meta.innerFileId = {chunkSize, pos.fileIdx};
meta.innerOffset = size_t{pos.offset};
ChunkMetaKey chunkMetaKey(chunkId);
XLOGF(DBG, "insert meta data {}", meta);
batchOp->put(chunkMetaKey, serde::serializeBytes(meta));
// 4. update created size.
batchOp->put(kCreatedSizeKey, serde::serializeBytes(createdSize_ += chunkSize));
// 5. commit on kv.
auto startTime = RelativeTime::now();
auto result = batchOp->commit();
storageMetaCreateCommit.addSample(RelativeTime::now() - startTime);
if (UNLIKELY(!result)) {
createdSize_ -= chunkSize;
{
auto lock = std::unique_lock(state.createMutex);
if (useRecycledChunk) {
state.recycledChunks.push_back(pos);
--state.reusedCount;
} else {
state.createdChunks.push_back(pos);
--state.usedCount;
}
}
auto msg = fmt::format("chunk id {} create failed: {}", chunkId, result.error());
XLOG(ERR, msg);
return makeError(StorageCode::kChunkMetadataSetError, std::move(msg));
}
recordGuard.succ();
return Void{};
}