CoTask StorageOperator::handleUpdate()

in src/storage/service/StorageOperator.cc [333:514]


CoTask<IOResult> StorageOperator::handleUpdate(ServiceRequestContext &requestCtx,
                                               UpdateReq &req,
                                               net::IBSocket *ibSocket,
                                               TargetPtr &target) {
  // 1. get target.
  if (UNLIKELY(req.options.fromClient && !target->isHead)) {
    XLOGF(ERR, "non-head node receive a client update request");
    co_return makeError(StorageClientCode::kRoutingError, "non-head node receive a client update request");
  }
  if (UNLIKELY(req.options.fromClient && config_.read_only())) {
    auto msg = fmt::format("storage is readonly!");
    XLOG(ERR, msg);
    co_return makeError(StatusCode::kReadOnlyMode, std::move(msg));
  }
  if (UNLIKELY(req.payload.key.chunkId.data().empty())) {
    auto msg = fmt::format("update request with empty chunk id: {}", req);
    XLOG(ERR, msg);
    co_return makeError(StatusCode::kInvalidArg, std::move(msg));
  }

  XLOGF(DBG1, "Start the replication process, target: {}, tag: {}, req: {}", target->targetId, req.tag, req);

  const auto &appInfo = components_.getAppInfo();
  auto trace = storageEventTrace_.newEntry(StorageEventTrace{
      .clusterId = appInfo.clusterId,
      .nodeId = appInfo.nodeId,
      .targetId = target->targetId,
      .updateReq = req,
  });

  // 2. lock chunk.
  folly::coro::Baton baton;
  auto recordGuard = waitChunkLockRecorder.record();
  auto lockGuard = target->storageTarget->lockChunk(baton, req.payload.key.chunkId, fmt::to_string(req.tag));
  if (!lockGuard.locked()) {
    XLOGF(DBG1,
          "write wait lock on chunk {}, current owner: {}, req: {}",
          req.payload.key.chunkId,
          lockGuard.currentTag(),
          req);
    co_await lockGuard.lock();
  }
  recordGuard.report(true);

  // re-check chain version after acquiring the lock.
  auto targetResult = components_.targetMap.getByChainId(req.payload.key.vChainId);
  if (UNLIKELY(!targetResult)) {
    co_return makeError(std::move(targetResult.error()));
  }
  target = std::move(*targetResult);

  ChunkEngineUpdateJob chunkEngineJob{};

  // 3. update local target.
  auto buffer = components_.rdmabufPool.get();
  net::RDMARemoteBuf remoteBuf;
  auto updateResult = co_await doUpdate(requestCtx,
                                        req.payload,
                                        req.options,
                                        req.featureFlags,
                                        target->storageTarget,
                                        ibSocket,
                                        buffer,
                                        remoteBuf,
                                        chunkEngineJob,
                                        !(req.options.fromClient && target->rejectCreateChunk));
  trace->updateRes = updateResult;

  uint32_t code = updateResult.lengthInfo ? 0 : updateResult.lengthInfo.error().code();
  if (code == 0) {
    // 1. write success.
    if (req.payload.updateVer == 0) {
      req.payload.updateVer = updateResult.updateVer;
    } else if (UNLIKELY(req.payload.updateVer != updateResult.updateVer)) {
      auto msg = fmt::format("write update version mismatch, req {}, result {}", req, updateResult);
      XLOG(DFATAL, msg);
      co_return makeError(StorageCode::kChunkVersionMismatch, std::move(msg));
    }
  } else if (code == StorageCode::kChunkMissingUpdate) {
    // 2. missing update.
    XLOGF(DFATAL, "write missing update and block, req {}, result {}", req, updateResult);
    co_return updateResult;
  } else if (code == StorageCode::kChunkCommittedUpdate) {
    // 3. committed update, considered as a successful write.
    updateResult.lengthInfo = req.payload.length;
    updateResult.updateVer = req.payload.updateVer;
    updateResult.commitVer = req.payload.updateVer;
    XLOGF(DFATAL, "write committed update, req {}, result {}", req, updateResult);
    co_return updateResult;
  } else if (code == StorageCode::kChunkStaleUpdate) {
    // 3. stale update, considered as a successful write.
    updateResult.lengthInfo = req.payload.length;
    updateResult.updateVer = req.payload.updateVer;
    XLOGF(CRITICAL, "write stale update, req {}, result {}", req, updateResult);
  } else if (code == StorageCode::kChunkAdvanceUpdate) {
    // 4. advance update.
    XLOGF(DFATAL, "write advance update, req {}, result {}", req, updateResult);
    co_return updateResult;
  } else {
    XLOGF(CRITICAL, "write update failed, req {}, result {}", req, updateResult);
    co_return updateResult;
  }

  XLOGF(DBG1, "Updated local chunk, target: {}, tag: {}, result: {}", target->targetId, req.tag, updateResult);

  // 4. forward to successor.
  CommitIO commitIO;
  commitIO.key = req.payload.key;
  commitIO.commitVer = updateResult.updateVer;
  commitIO.isRemove = req.payload.isRemove();

  auto forwardResult = co_await components_.reliableForwarding
                           .forwardWithRetry(requestCtx, req, remoteBuf, chunkEngineJob, target, commitIO);
  if (UNLIKELY(commitIO.commitVer != updateResult.updateVer)) {
    auto msg = fmt::format("commit version mismatch, req: {}, successor {} != local {}",
                           req,
                           commitIO.commitVer,
                           updateResult.updateVer);
    XLOG(DFATAL, msg);
    co_return makeError(StorageCode::kChunkVersionMismatch, std::move(msg));
  }

  XLOGF(DBG1,
        "Forwarded update to successor {}, target: {}, tag: {}, result: {}",
        (target->successor ? target->successor->targetInfo.targetId : TargetId{0}),
        target->targetId,
        req.tag,
        forwardResult);
  trace->forwardRes = forwardResult;
  trace->commitIO = commitIO;

  if (forwardResult.lengthInfo) {
    if (commitIO.isRemove && (forwardResult.checksum.type == ChecksumType::NONE ||
                              updateResult.checksum.type == ChecksumType::NONE || commitIO.isSyncing)) {
      // The known issue is that during the delete operation, it is possible for one side to encounter a "chunk not
      // found" situation.
      XLOGF(INFO,
            "Remove op local checksum {} not equal to checksum {} generated by successor, key: {}, syncing: {}",
            updateResult.checksum,
            forwardResult.checksum,
            req.payload.key,
            commitIO.isSyncing);
    } else if (forwardResult.checksum != updateResult.checksum) {
      auto msg = fmt::format("Local checksum {} not equal to checksum {} generated by successor, key: {}",
                             updateResult.checksum,
                             forwardResult.checksum,
                             req.payload.key);
      XLOG_IF(DFATAL, !requestCtx.debugFlags.faultInjectionEnabled(), msg);
      co_return makeError(StorageClientCode::kChecksumMismatch, std::move(msg));
    }
  } else if (forwardResult.lengthInfo.error().code() != StorageCode::kNoSuccessorTarget) {
    co_return forwardResult;
  }

  // 5. commit.
  auto commitResult =
      co_await doCommit(requestCtx, commitIO, req.options, chunkEngineJob, req.featureFlags, target->storageTarget);

  code = commitResult.lengthInfo ? 0 : commitResult.lengthInfo.error().code();

  if (LIKELY(code == 0)) {
    // 1. commit success.
  } else if (code == StorageCode::kChunkStaleCommit) {
    // 2. stale commit, considered as a successful commit.
    XLOGF(INFO, "write stale commit, req {}, result {}", req, commitResult);
    commitResult.commitVer = updateResult.updateVer;
  } else {
    // 3. commit fail.
    XLOGF(ERR, "write commit fail, req {}, result {}", req, commitResult);
    co_return commitResult;
  }

  commitResult.lengthInfo = updateResult.lengthInfo;
  commitResult.checksum = updateResult.checksum;

  XLOGF(DBG1, "Committed local chunk, target: {}, tag: {}, result: {}", target->targetId, req.tag, commitResult);
  trace->commitRes = commitResult;

  // storageEventTrace_.append(const StorageEventTrace &obj)

  co_return commitResult;
}