in src/storage/store/ChunkMetaStore.cc [658:828]
Result<ChunkMetaStore::AllocateState *> ChunkMetaStore::loadAllocateState(uint32_t chunkSize) {
auto it = allocateState_.find(chunkSize);
if (UNLIKELY(it == allocateState_.end())) {
return makeError(StorageCode::kChunkInvalidChunkSize, fmt::format("invalid chunk size {}", chunkSize));
}
auto &state = *it->second;
if (state.loaded) {
return &state;
}
auto lock = std::unique_lock(state.createMutex);
if (state.loaded) {
return &state;
}
// 0. load target version.
uint32_t version = 0;
auto versionKeyStr = serde::serializeBytes(VersionKey{chunkSize});
auto versionResult = kv_->get(versionKeyStr);
if (LIKELY(bool(versionResult))) {
RETURN_AND_LOG_ON_ERROR(serde::deserialize(version, *versionResult));
} else if (versionResult.error().code() == StatusCode::kKVStoreNotFound) {
// it's ok and init with 0.
} else {
RETURN_AND_LOG_ON_ERROR(versionResult);
}
// 1. load allocate index.
auto result = kv_->get(serializeKey(AllocateIndexKey{state.chunkSize}));
if (LIKELY(bool(result))) {
RETURN_AND_LOG_ON_ERROR(serde::deserialize(state.allocateIndex, *result));
} else if (result.error().code() == StatusCode::kKVStoreNotFound) {
// it's ok and init with 0.
} else {
RETURN_AND_LOG_ON_ERROR(result);
}
auto allocateStartKey = serializeKey(AllocateStartKey{state.chunkSize});
result = kv_->get(allocateStartKey);
if (LIKELY(bool(result))) {
uint64_t startingPoint{};
RETURN_AND_LOG_ON_ERROR(serde::deserialize(startingPoint, *result));
state.startingPoint = startingPoint;
} else if (result.error().code() == StatusCode::kKVStoreNotFound) {
uint64_t fileSize{};
FileSizeKey fileSizeKey;
fileSizeKey.fileId = ChunkFileId{state.chunkSize, 0};
auto fileSizeKeyStr = serde::serializeBytes(fileSizeKey);
auto result = kv_->get(fileSizeKeyStr);
if (LIKELY(bool(result))) {
RETURN_AND_LOG_ON_ERROR(serde::deserialize(fileSize, *result));
} else if (result.error().code() == StatusCode::kKVStoreNotFound) {
// it's ok and init with 0.
} else {
RETURN_AND_LOG_ON_ERROR(result);
}
auto allocateSize = config_.allocate_size();
state.startingPoint = (fileSize + allocateSize - 1) / allocateSize * allocateSize;
RETURN_AND_LOG_ON_ERROR(kv_->put(allocateStartKey, serde::serializeBytes(state.startingPoint.load())));
} else {
RETURN_AND_LOG_ON_ERROR(result);
}
if (version == 0) {
// version 0->1: fix removed and recycled count.
auto batchOp = kv_->createBatchOps();
version = 1;
batchOp->put(versionKeyStr, serde::serializeBytes(version));
if (state.startingPoint > 0) {
uint64_t removedCount = 0;
uint64_t recycledCount = 0;
// scan removed key and re-count it.
RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
serializeKey(ChunkSizePrefix<MetaKeyType::REMOVED>{state.chunkSize}),
std::numeric_limits<uint32_t>::max(),
[&](std::string_view key, auto) -> Result<Void> {
RemovedKey removedKey;
RETURN_AND_LOG_ON_ERROR(deserializeKey(key, removedKey));
if (removedKey.pos.offset < state.startingPoint.load()) {
// remove it.
batchOp->remove(key);
} else {
++removedCount;
}
return Void{};
},
nullptr));
// scan recycled key and re-count it.
RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
serializeKey(ChunkSizePrefix<MetaKeyType::RECYCLED>{chunkSize}),
std::numeric_limits<uint32_t>::max(),
[&](std::string_view key, auto) -> Result<Void> {
RecycledKey recycledKey;
RETURN_AND_LOG_ON_ERROR(deserializeKey(key, recycledKey));
if (recycledKey.pos.offset < state.startingPoint.load()) {
// remove it.
batchOp->remove(key);
} else {
++recycledCount;
++removedCount;
}
return Void{};
},
nullptr));
batchOp->put(serializeKey(RemovedCountKey{chunkSize}), serde::serializeBytes(removedCount));
batchOp->put(serializeKey(RecycledCountKey{chunkSize}), serde::serializeBytes(recycledCount));
batchOp->put(serializeKey(ReusedCountKey{chunkSize}), serde::serializeBytes(uint64_t{}));
batchOp->put(serializeKey(HoleCountKey{chunkSize}), serde::serializeBytes(uint64_t{}));
XLOGF(WARNING, "chunk size {} version 1 fix removed {} recycled {}", chunkSize, removedCount, recycledCount);
}
RETURN_AND_LOG_ON_ERROR(batchOp->commit());
}
// 2. load created chunks.
std::vector<ChunkPosition> createdChunks;
RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
serializeKey(ChunkSizePrefix<MetaKeyType::CREATED>{chunkSize}),
std::numeric_limits<uint32_t>::max(),
[&](std::string_view key, auto) -> Result<Void> {
CreatedKey createdKey;
RETURN_AND_LOG_ON_ERROR(deserializeKey(key, createdKey));
createdChunks.push_back(createdKey.pos);
XLOGF(DBG5, "load created key: {}", createdKey);
return Void{};
},
nullptr));
std::reverse(createdChunks.begin(), createdChunks.end());
// 3. load unused chunks.
std::vector<ChunkPosition> recycledChunks;
RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
serializeKey(ChunkSizePrefix<MetaKeyType::RECYCLED>{chunkSize}),
std::numeric_limits<uint32_t>::max(),
[&](std::string_view key, auto) -> Result<Void> {
RecycledKey recycledKey;
RETURN_AND_LOG_ON_ERROR(deserializeKey(key, recycledKey));
recycledChunks.push_back(recycledKey.pos);
XLOGF(DBG5, "load recycled key: {}", recycledKey);
return Void{};
},
nullptr));
// 4. load size.
RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(CreatedCountKey{state.chunkSize}), state.createdCount));
RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(UsedCountKey{state.chunkSize}), state.usedCount));
RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(RemovedCountKey{state.chunkSize}), state.removedCount));
RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(RecycledCountKey{state.chunkSize}), state.recycledCount));
RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(ReusedCountKey{state.chunkSize}), state.reusedCount));
RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(HoleCountKey{state.chunkSize}), state.holeCount));
// 5. load oldest removed timestamp.
std::optional<std::string> nextValidKey;
RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
serializeKey(ChunkSizePrefix<MetaKeyType::REMOVED>{state.chunkSize}),
0,
[](auto, auto) -> Result<Void> { return Void{}; },
&nextValidKey));
UtcTime oldestRemovedTimestamp{};
if (nextValidKey) {
RemovedKey removedKey;
RETURN_AND_LOG_ON_ERROR(deserializeKey(*nextValidKey, removedKey));
oldestRemovedTimestamp = UtcTime::fromMicroseconds(removedKey.microsecond);
}
state.recycledChunks = std::move(recycledChunks);
state.createdChunks = std::move(createdChunks);
state.oldestRemovedTimestamp = oldestRemovedTimestamp;
state.loaded = true;
return &state;
}