CoTask ReliableUpdate::update()

in src/storage/service/ReliableUpdate.cc [16:127]


CoTask<IOResult> ReliableUpdate::update(ServiceRequestContext &requestCtx,
                                        UpdateReq &req,
                                        net::IBSocket *ibSocket,
                                        TargetPtr &target) {
  XLOGF(DBG1, "Start reliable update, tag: {}, req: {}", req.tag, req);

  if (UNLIKELY(stopped_)) {
    auto msg = fmt::format("req is refused because of stopping, req {}", req);
    XLOG(ERR, msg);
    co_return makeError(RPCCode::kRequestRefused, std::move(msg));
  }

  // 1. check if channel id is valid.
  if (req.tag.channel.id == ChannelId{0}) {
    XLOGF(DFATAL,
          "{} request has invalid message tag {}: {}",
          magic_enum::enum_name(req.payload.updateType),
          req.tag,
          req);
    co_return makeError(StorageClientCode::kFoundBug);
  }

  // 2. get cached.
  auto clientId = req.tag.clientId;
  auto reqResult = shards_.withLock(
      [&](ClientMap &map) {
        auto &clientStatus = map[clientId];
        if (clientStatus == nullptr) {
          clientStatus = std::make_shared<ClientStatus>();
        }
        auto key = std::pair<ChainId, ChannelId>(req.payload.key.vChainId.chainId, req.tag.channel.id);
        auto &reqResult = clientStatus->channelMap[key];
        clientStatus->lastUsedTime = UtcClock::now();
        return std::shared_ptr<ReqResult>(clientStatus, &reqResult);
      },
      clientId);

  // 3. lock channel.
  auto lockRecordGuard = waitChannelLockRecorder.record();
  folly::coro::Baton baton;
  auto lock = target->storageTarget->tryLockChannel(baton, fmt::format("{}:{}", clientId, req.tag.channel.id));
  if (!lock.locked()) {
    reliableUpdateWaited.addSample(1);
    XLOGF(ERR, "Channel is locked, need retry, tag: {}, req: {}", req.tag, req);
    co_return makeError(StorageCode::kChannelIsLocked);
  }
  lockRecordGuard.report(true);

  IOResult updateResult;
  if (req.tag.channel.seqnum < reqResult->channelSeqnum) {
    reliableUpdateDuplidate.addSample(1);
    XLOGF(WARN, "Find a duplicate update, tag: {}, cached result: {}, req: {}", req.tag, *reqResult, req);
    co_return makeError(StorageClientCode::kDuplicateUpdate);
  }

  // 4. return cached result.
  if (req.tag.channel.seqnum == reqResult->channelSeqnum &&
      target->storageTarget->generationId() == reqResult->generationId) {
    if (req.tag.requestId != reqResult->requestId) {
      XLOGF(DFATAL,
            "[BUG] Message tag {} is already assigned to another update, cached result: {}, req: {}",
            req.tag,
            *reqResult,
            req);
      co_return makeError(StorageClientCode::kFoundBug);
    }

    if (reqResult->updateResult.lengthInfo.hasValue()) {
      if (req.payload.updateVer == 0 || req.payload.updateVer == reqResult->updateResult.updateVer) {
        updateResult = reqResult->updateResult;

        if (*updateResult.lengthInfo != req.payload.length && !req.payload.isExtend()) {
          updateResult.lengthInfo = req.payload.length;
          XLOGF(WARN,
                "Cached length info {} not equal to write size in request {}, fixed update result: {}",
                reqResult->updateResult.lengthInfo,
                req,
                updateResult);
        }

        reliableUpdateCached.addSample(1);
        XLOGF(DBG1, "Return cached update result, tag: {}, cached result: {}, req: {}", req.tag, *reqResult, req);
        co_return updateResult;
      } else {
        XLOGF(CRITICAL,
              "Cached update version not equal to request update version, req:{}, cached result: {}",
              req,
              *reqResult);
      }
    } else if (req.payload.updateVer == 0 && !target->storageTarget->useChunkEngine() &&
               reqResult->succUpdateVer != 0) {
      XLOGF(CRITICAL, "Pick up previous update version, tag: {}, cached result: {}, req: {}", req.tag, *reqResult, req);
      req.payload.updateVer = reqResult->succUpdateVer;
    }
  }

  // 5. start a new task.
  auto recordGuard = reliableUpdateRecorder.record();
  updateResult = co_await components_.storageOperator.handleUpdate(requestCtx, req, ibSocket, target);
  if (LIKELY(bool(updateResult.lengthInfo))) {
    recordGuard.succ();
  }

  *reqResult = {req.tag.channel.seqnum,
                req.tag.requestId,
                updateResult,
                req.payload.updateVer,
                target->storageTarget->generationId()};

  XLOGF(DBG1, "Completed reliable update, tag: {}, result: {}", req.tag, *reqResult);
  co_return updateResult;
}