bool PieceWiseCacheAdapter::updatePieceProcessing()

in cachelib/cachebench/workload/PieceWiseCache.cpp [424:514]


bool PieceWiseCacheAdapter::updatePieceProcessing(PieceWiseReqWrapper& rw,
                                                  OpResultType result) {
  // we are only done if we have got everything.
  bool done = false;
  if (result == OpResultType::kGetHit || result == OpResultType::kSetSuccess ||
      result == OpResultType::kSetFailure) {
    // The piece index we need to fetch next
    auto nextPieceIndex = rw.cachePieces->getCurFetchingPieceIndex();

    // Record the cache hit stats
    if (result == OpResultType::kGetHit) {
      if (rw.isHeaderPiece) {
        stats_.recordPieceHeaderHit(rw.sizes[0], rw.statsAggFields);
      } else {
        auto resultPieceIndex = nextPieceIndex - 1;
        // We always fetch a complete piece.
        auto pieceSize = rw.cachePieces->getSizeOfAPiece(resultPieceIndex);
        stats_.recordPieceBodyHit(pieceSize, rw.statsAggFields);
      }
    }

    // For pieces that are beyond pieces number limit (maxCachePieces_),
    // we don't store them
    if (rw.cachePieces->isPieceWithinBound(nextPieceIndex) &&
        nextPieceIndex < maxCachePieces_) {
      // First set the correct key. Header piece has already been fetched,
      // this is now a body piece.
      rw.pieceKey = GenericPieces::createPieceKey(
          rw.baseKey, nextPieceIndex, rw.cachePieces->getPiecesPerGroup());

      // Set the size of the piece
      rw.sizes[0] = rw.cachePieces->getSizeOfAPiece(nextPieceIndex);

      if (result == OpResultType::kGetHit) {
        // Fetch next piece
        rw.req.setOp(OpType::kGet);
      } else {
        // Once we start to set a piece, we set all subsequent pieces
        rw.req.setOp(OpType::kSet);
      }

      // Update the piece fetch index
      rw.isHeaderPiece = false;
      rw.cachePieces->updateFetchIndex();
    } else {
      if (result == OpResultType::kGetHit) {
        if (!rw.cachePieces->isPieceWithinBound(nextPieceIndex)) {
          // We have got all the pieces that are requested, record the full
          // cache hit stats
          auto totalSize = rw.cachePieces->getTotalSize();
          stats_.recordPieceFullHit(rw.headerSize, totalSize,
                                    rw.statsAggFields);
        } else {
          // The remaining pieces are beyond maxCachePieces_, we don't store
          // them in cache and fetch them from upstream directly
          if (nextPieceIndex >= maxCachePieces_) {
            stats_.recordIngressBytes(
                rw.headerSize + rw.cachePieces->getRemainingBytes(),
                rw.statsAggFields);
          }
        }
      }

      // we are done
      done = true;
    }
  } else if (result == OpResultType::kGetMiss) {
    // Record ingress bytes since we will fetch the bytes from upstream.
    size_t ingressBytes;
    if (rw.isHeaderPiece) {
      ingressBytes = rw.headerSize + rw.cachePieces->getRemainingBytes();
    } else {
      // Note we advance the piece index ahead of time, so
      // getCurFetchingPieceIndex() returns the next piece index
      auto missPieceIndex = rw.cachePieces->getCurFetchingPieceIndex() - 1;
      ingressBytes = rw.headerSize +
                     rw.cachePieces->getSizeOfAPiece(missPieceIndex) +
                     rw.cachePieces->getRemainingBytes();
    }
    stats_.recordIngressBytes(ingressBytes, rw.statsAggFields);

    // Perform set operation next for the current piece
    rw.req.setOp(OpType::kSet);
  } else if (result == OpResultType::kSetSkip) {
    // No need to set subsequent pieces.
    done = true;
  } else {
    XLOG(INFO) << "Unsupported OpResultType: " << (int)result;
  }
  return done;
}