Result ChunkMetaStore::createChunk()

in src/storage/store/ChunkMetaStore.cc [427:535]


Result<Void> ChunkMetaStore::createChunk(const ChunkId &chunkId,
                                         ChunkMetadata &meta,
                                         uint32_t chunkSize,
                                         folly::CPUThreadPoolExecutor &executor,
                                         bool allowToAllocate) {
  auto recordGuard = storageMetaCreate.record();

  auto stateResult = loadAllocateState(chunkSize);
  RETURN_AND_LOG_ON_ERROR(stateResult);
  auto &state = **stateResult;

  ChunkPosition pos;
  bool useRecycledChunk = false;
  auto batchOp = kv_->createBatchOps();

  {
    auto startTime = RelativeTime::now();
    auto lock = std::unique_lock(state.createMutex);
    storageMetaCreateWait.addSample(RelativeTime::now() - startTime);
    if (needRecycleRemovedChunks(state)) {
      if (!state.recycling.exchange(true)) {
        executor.add([this, &state] { recycleRemovedChunks(state); });
      }
      if (state.recycledChunks.empty()) {
        lock.unlock();
        auto recycleLock = std::unique_lock(state.recycleMutex);
        lock.lock();
        if (state.recycledChunks.empty()) {
          RETURN_AND_LOG_ON_ERROR(recycleRemovedChunks(state, true));
        }
      }
    }
    useRecycledChunk = !state.recycledChunks.empty();
    if (useRecycledChunk) {
      // 2.1 use recycled chunks.
      pos = state.recycledChunks.back();
      state.recycledChunks.pop_back();
      batchOp->put(serializeKey(ReusedCountKey{state.chunkSize}), serde::serializeBytes(++state.reusedCount));

      RecycledKey recycledKey;
      recycledKey.chunkSize = chunkSize;
      recycledKey.pos = pos;
      batchOp->remove(serializeKey(recycledKey));
    } else {
      if (!allowToAllocate) {
        auto msg = fmt::format("chunk {} create new chunk write limit", chunkId);
        XLOG(ERR, msg);
        return makeError(StorageClientCode::kNoSpace, std::move(msg));
      }

      // 2.2 use created chunks.
      if (state.createdChunks.size() * state.chunkSize <= config_.allocate_size() / 2) {
        if (!state.allocating.exchange(true)) {
          executor.add([this, &state] { allocateChunks(state); });
        }
      }
      if (state.createdChunks.empty()) {
        lock.unlock();
        auto allocateLock = std::unique_lock(state.allocateMutex);
        lock.lock();
        if (state.createdChunks.empty()) {
          RETURN_AND_LOG_ON_ERROR(allocateChunks(state, true));
        }
      }
      pos = state.createdChunks.back();
      state.createdChunks.pop_back();
      batchOp->put(serializeKey(UsedCountKey{state.chunkSize}), serde::serializeBytes(++state.usedCount));

      CreatedKey extendKey;
      extendKey.chunkSize = chunkSize;
      extendKey.pos = pos;
      batchOp->remove(serializeKey(extendKey));
    }
  }

  // 3. insert metadata.
  meta.innerFileId = {chunkSize, pos.fileIdx};
  meta.innerOffset = size_t{pos.offset};
  ChunkMetaKey chunkMetaKey(chunkId);
  XLOGF(DBG, "insert meta data {}", meta);
  batchOp->put(chunkMetaKey, serde::serializeBytes(meta));

  // 4. update created size.
  batchOp->put(kCreatedSizeKey, serde::serializeBytes(createdSize_ += chunkSize));

  // 5. commit on kv.
  auto startTime = RelativeTime::now();
  auto result = batchOp->commit();
  storageMetaCreateCommit.addSample(RelativeTime::now() - startTime);
  if (UNLIKELY(!result)) {
    createdSize_ -= chunkSize;
    {
      auto lock = std::unique_lock(state.createMutex);
      if (useRecycledChunk) {
        state.recycledChunks.push_back(pos);
        --state.reusedCount;
      } else {
        state.createdChunks.push_back(pos);
        --state.usedCount;
      }
    }
    auto msg = fmt::format("chunk id {} create failed: {}", chunkId, result.error());
    XLOG(ERR, msg);
    return makeError(StorageCode::kChunkMetadataSetError, std::move(msg));
  }

  recordGuard.succ();
  return Void{};
}