in cachelib/cachebench/workload/PieceWiseCache.cpp [424:514]
bool PieceWiseCacheAdapter::updatePieceProcessing(PieceWiseReqWrapper& rw,
OpResultType result) {
// we are only done if we have got everything.
bool done = false;
if (result == OpResultType::kGetHit || result == OpResultType::kSetSuccess ||
result == OpResultType::kSetFailure) {
// The piece index we need to fetch next
auto nextPieceIndex = rw.cachePieces->getCurFetchingPieceIndex();
// Record the cache hit stats
if (result == OpResultType::kGetHit) {
if (rw.isHeaderPiece) {
stats_.recordPieceHeaderHit(rw.sizes[0], rw.statsAggFields);
} else {
auto resultPieceIndex = nextPieceIndex - 1;
// We always fetch a complete piece.
auto pieceSize = rw.cachePieces->getSizeOfAPiece(resultPieceIndex);
stats_.recordPieceBodyHit(pieceSize, rw.statsAggFields);
}
}
// For pieces that are beyond pieces number limit (maxCachePieces_),
// we don't store them
if (rw.cachePieces->isPieceWithinBound(nextPieceIndex) &&
nextPieceIndex < maxCachePieces_) {
// First set the correct key. Header piece has already been fetched,
// this is now a body piece.
rw.pieceKey = GenericPieces::createPieceKey(
rw.baseKey, nextPieceIndex, rw.cachePieces->getPiecesPerGroup());
// Set the size of the piece
rw.sizes[0] = rw.cachePieces->getSizeOfAPiece(nextPieceIndex);
if (result == OpResultType::kGetHit) {
// Fetch next piece
rw.req.setOp(OpType::kGet);
} else {
// Once we start to set a piece, we set all subsequent pieces
rw.req.setOp(OpType::kSet);
}
// Update the piece fetch index
rw.isHeaderPiece = false;
rw.cachePieces->updateFetchIndex();
} else {
if (result == OpResultType::kGetHit) {
if (!rw.cachePieces->isPieceWithinBound(nextPieceIndex)) {
// We have got all the pieces that are requested, record the full
// cache hit stats
auto totalSize = rw.cachePieces->getTotalSize();
stats_.recordPieceFullHit(rw.headerSize, totalSize,
rw.statsAggFields);
} else {
// The remaining pieces are beyond maxCachePieces_, we don't store
// them in cache and fetch them from upstream directly
if (nextPieceIndex >= maxCachePieces_) {
stats_.recordIngressBytes(
rw.headerSize + rw.cachePieces->getRemainingBytes(),
rw.statsAggFields);
}
}
}
// we are done
done = true;
}
} else if (result == OpResultType::kGetMiss) {
// Record ingress bytes since we will fetch the bytes from upstream.
size_t ingressBytes;
if (rw.isHeaderPiece) {
ingressBytes = rw.headerSize + rw.cachePieces->getRemainingBytes();
} else {
// Note we advance the piece index ahead of time, so
// getCurFetchingPieceIndex() returns the next piece index
auto missPieceIndex = rw.cachePieces->getCurFetchingPieceIndex() - 1;
ingressBytes = rw.headerSize +
rw.cachePieces->getSizeOfAPiece(missPieceIndex) +
rw.cachePieces->getRemainingBytes();
}
stats_.recordIngressBytes(ingressBytes, rw.statsAggFields);
// Perform set operation next for the current piece
rw.req.setOp(OpType::kSet);
} else if (result == OpResultType::kSetSkip) {
// No need to set subsequent pieces.
done = true;
} else {
XLOG(INFO) << "Unsupported OpResultType: " << (int)result;
}
return done;
}