Result ChunkMetaStore::loadAllocateState()

in src/storage/store/ChunkMetaStore.cc [658:828]


Result<ChunkMetaStore::AllocateState *> ChunkMetaStore::loadAllocateState(uint32_t chunkSize) {
  auto it = allocateState_.find(chunkSize);
  if (UNLIKELY(it == allocateState_.end())) {
    return makeError(StorageCode::kChunkInvalidChunkSize, fmt::format("invalid chunk size {}", chunkSize));
  }
  auto &state = *it->second;
  if (state.loaded) {
    return &state;
  }

  auto lock = std::unique_lock(state.createMutex);
  if (state.loaded) {
    return &state;
  }

  // 0. load target version.
  uint32_t version = 0;
  auto versionKeyStr = serde::serializeBytes(VersionKey{chunkSize});
  auto versionResult = kv_->get(versionKeyStr);
  if (LIKELY(bool(versionResult))) {
    RETURN_AND_LOG_ON_ERROR(serde::deserialize(version, *versionResult));
  } else if (versionResult.error().code() == StatusCode::kKVStoreNotFound) {
    // it's ok and init with 0.
  } else {
    RETURN_AND_LOG_ON_ERROR(versionResult);
  }

  // 1. load allocate index.
  auto result = kv_->get(serializeKey(AllocateIndexKey{state.chunkSize}));
  if (LIKELY(bool(result))) {
    RETURN_AND_LOG_ON_ERROR(serde::deserialize(state.allocateIndex, *result));
  } else if (result.error().code() == StatusCode::kKVStoreNotFound) {
    // it's ok and init with 0.
  } else {
    RETURN_AND_LOG_ON_ERROR(result);
  }
  auto allocateStartKey = serializeKey(AllocateStartKey{state.chunkSize});
  result = kv_->get(allocateStartKey);
  if (LIKELY(bool(result))) {
    uint64_t startingPoint{};
    RETURN_AND_LOG_ON_ERROR(serde::deserialize(startingPoint, *result));
    state.startingPoint = startingPoint;
  } else if (result.error().code() == StatusCode::kKVStoreNotFound) {
    uint64_t fileSize{};
    FileSizeKey fileSizeKey;
    fileSizeKey.fileId = ChunkFileId{state.chunkSize, 0};
    auto fileSizeKeyStr = serde::serializeBytes(fileSizeKey);
    auto result = kv_->get(fileSizeKeyStr);
    if (LIKELY(bool(result))) {
      RETURN_AND_LOG_ON_ERROR(serde::deserialize(fileSize, *result));
    } else if (result.error().code() == StatusCode::kKVStoreNotFound) {
      // it's ok and init with 0.
    } else {
      RETURN_AND_LOG_ON_ERROR(result);
    }
    auto allocateSize = config_.allocate_size();
    state.startingPoint = (fileSize + allocateSize - 1) / allocateSize * allocateSize;
    RETURN_AND_LOG_ON_ERROR(kv_->put(allocateStartKey, serde::serializeBytes(state.startingPoint.load())));
  } else {
    RETURN_AND_LOG_ON_ERROR(result);
  }

  if (version == 0) {
    // version 0->1: fix removed and recycled count.
    auto batchOp = kv_->createBatchOps();
    version = 1;
    batchOp->put(versionKeyStr, serde::serializeBytes(version));
    if (state.startingPoint > 0) {
      uint64_t removedCount = 0;
      uint64_t recycledCount = 0;
      // scan removed key and re-count it.
      RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
          serializeKey(ChunkSizePrefix<MetaKeyType::REMOVED>{state.chunkSize}),
          std::numeric_limits<uint32_t>::max(),
          [&](std::string_view key, auto) -> Result<Void> {
            RemovedKey removedKey;
            RETURN_AND_LOG_ON_ERROR(deserializeKey(key, removedKey));
            if (removedKey.pos.offset < state.startingPoint.load()) {
              // remove it.
              batchOp->remove(key);
            } else {
              ++removedCount;
            }
            return Void{};
          },
          nullptr));
      // scan recycled key and re-count it.
      RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
          serializeKey(ChunkSizePrefix<MetaKeyType::RECYCLED>{chunkSize}),
          std::numeric_limits<uint32_t>::max(),
          [&](std::string_view key, auto) -> Result<Void> {
            RecycledKey recycledKey;
            RETURN_AND_LOG_ON_ERROR(deserializeKey(key, recycledKey));
            if (recycledKey.pos.offset < state.startingPoint.load()) {
              // remove it.
              batchOp->remove(key);
            } else {
              ++recycledCount;
              ++removedCount;
            }
            return Void{};
          },
          nullptr));

      batchOp->put(serializeKey(RemovedCountKey{chunkSize}), serde::serializeBytes(removedCount));
      batchOp->put(serializeKey(RecycledCountKey{chunkSize}), serde::serializeBytes(recycledCount));
      batchOp->put(serializeKey(ReusedCountKey{chunkSize}), serde::serializeBytes(uint64_t{}));
      batchOp->put(serializeKey(HoleCountKey{chunkSize}), serde::serializeBytes(uint64_t{}));
      XLOGF(WARNING, "chunk size {} version 1 fix removed {} recycled {}", chunkSize, removedCount, recycledCount);
    }

    RETURN_AND_LOG_ON_ERROR(batchOp->commit());
  }

  // 2. load created chunks.
  std::vector<ChunkPosition> createdChunks;
  RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
      serializeKey(ChunkSizePrefix<MetaKeyType::CREATED>{chunkSize}),
      std::numeric_limits<uint32_t>::max(),
      [&](std::string_view key, auto) -> Result<Void> {
        CreatedKey createdKey;
        RETURN_AND_LOG_ON_ERROR(deserializeKey(key, createdKey));
        createdChunks.push_back(createdKey.pos);
        XLOGF(DBG5, "load created key: {}", createdKey);
        return Void{};
      },
      nullptr));
  std::reverse(createdChunks.begin(), createdChunks.end());

  // 3. load unused chunks.
  std::vector<ChunkPosition> recycledChunks;
  RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
      serializeKey(ChunkSizePrefix<MetaKeyType::RECYCLED>{chunkSize}),
      std::numeric_limits<uint32_t>::max(),
      [&](std::string_view key, auto) -> Result<Void> {
        RecycledKey recycledKey;
        RETURN_AND_LOG_ON_ERROR(deserializeKey(key, recycledKey));
        recycledChunks.push_back(recycledKey.pos);
        XLOGF(DBG5, "load recycled key: {}", recycledKey);
        return Void{};
      },
      nullptr));

  // 4. load size.
  RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(CreatedCountKey{state.chunkSize}), state.createdCount));
  RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(UsedCountKey{state.chunkSize}), state.usedCount));
  RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(RemovedCountKey{state.chunkSize}), state.removedCount));
  RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(RecycledCountKey{state.chunkSize}), state.recycledCount));
  RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(ReusedCountKey{state.chunkSize}), state.reusedCount));
  RETURN_AND_LOG_ON_ERROR(getSize(serializeKey(HoleCountKey{state.chunkSize}), state.holeCount));

  // 5. load oldest removed timestamp.
  std::optional<std::string> nextValidKey;
  RETURN_AND_LOG_ON_ERROR(kv_->iterateKeysWithPrefix(
      serializeKey(ChunkSizePrefix<MetaKeyType::REMOVED>{state.chunkSize}),
      0,
      [](auto, auto) -> Result<Void> { return Void{}; },
      &nextValidKey));
  UtcTime oldestRemovedTimestamp{};
  if (nextValidKey) {
    RemovedKey removedKey;
    RETURN_AND_LOG_ON_ERROR(deserializeKey(*nextValidKey, removedKey));
    oldestRemovedTimestamp = UtcTime::fromMicroseconds(removedKey.microsecond);
  }

  state.recycledChunks = std::move(recycledChunks);
  state.createdChunks = std::move(createdChunks);
  state.oldestRemovedTimestamp = oldestRemovedTimestamp;
  state.loaded = true;
  return &state;
}