cachelib/cachebench/workload/PieceWiseCache.cpp (413 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "cachelib/cachebench/workload/PieceWiseCache.h" namespace { constexpr double kGB = 1024.0 * 1024 * 1024; } namespace facebook { namespace cachelib { namespace cachebench { void PieceWiseCacheStats::InternalStats::updateTimestamp(uint64_t timestamp) { std::lock_guard<std::mutex> lck(tsMutex_); if (startTimestamp_ > timestamp || startTimestamp_ == 0) { startTimestamp_ = timestamp; } if (endTimestamp_ < timestamp) { endTimestamp_ = timestamp; } } std::pair<uint64_t, uint64_t> PieceWiseCacheStats::InternalStats::getTimestamps() { std::pair<uint64_t, uint64_t> result; { std::lock_guard<std::mutex> lck(tsMutex_); result = {startTimestamp_, endTimestamp_}; } return result; } PieceWiseCacheStats::PieceWiseCacheStats( uint32_t numAggregationFields, const std::unordered_map<uint32_t, std::vector<std::string>>& statsPerAggField) { // Doing a pre-allocation for extraStatsIndexM_ and extraStatsV_ here, // so we can use them afterwards without lock. // c++ guarantees thread safety for its const functions in the absence // of any non-const access. uint32_t extraStatsCount = 0; for (const auto& kv : statsPerAggField) { XCHECK_LT(kv.first, numAggregationFields); std::map<std::string, uint32_t> stat; for (const auto& fieldValue : kv.second) { stat[fieldValue] = extraStatsCount++; } extraStatsIndexM_[kv.first] = std::move(stat); } extraStatsV_ = std::vector<InternalStats>(extraStatsCount); } void PieceWiseCacheStats::recordAccess( uint64_t timestamp, size_t getBytes, size_t getBodyBytes, size_t egressBytes, const std::vector<std::string>& statsAggFields) { // Adjust the timestamp given current sample. stats_.updateTimestamp(timestamp); lastWindowStats_.updateTimestamp(timestamp); recordStats(recordAccessInternal, statsAggFields, getBytes, getBodyBytes, egressBytes); } void PieceWiseCacheStats::recordAccessInternal(InternalStats& stats, size_t getBytes, size_t getBodyBytes, size_t egressBytes) { stats.getBytes.add(getBytes); stats.getBodyBytes.add(getBodyBytes); stats.objGets.inc(); stats.totalEgressBytes.add(egressBytes); } void PieceWiseCacheStats::recordNonPieceHit( size_t hitBytes, size_t hitBodyBytes, const std::vector<std::string>& statsAggFields) { recordStats(recordNonPieceHitInternal, statsAggFields, hitBytes, hitBodyBytes); } void PieceWiseCacheStats::recordNonPieceHitInternal(InternalStats& stats, size_t hitBytes, size_t hitBodyBytes) { stats.getHitBytes.add(hitBytes); stats.getFullHitBytes.add(hitBytes); stats.getHitBodyBytes.add(hitBodyBytes); stats.getFullHitBodyBytes.add(hitBodyBytes); stats.objGetHits.inc(); stats.objGetFullHits.inc(); } void PieceWiseCacheStats::recordPieceHeaderHit( size_t pieceBytes, const std::vector<std::string>& statsAggFields) { recordStats(recordPieceHeaderHitInternal, statsAggFields, pieceBytes); } void PieceWiseCacheStats::recordPieceHeaderHitInternal(InternalStats& stats, size_t pieceBytes) { stats.getHitBytes.add(pieceBytes); stats.objGetHits.inc(); } void PieceWiseCacheStats::recordPieceBodyHit( size_t pieceBytes, const std::vector<std::string>& statsAggFields) { recordStats(recordPieceBodyHitInternal, statsAggFields, pieceBytes); } void PieceWiseCacheStats::recordPieceBodyHitInternal(InternalStats& stats, size_t pieceBytes) { stats.getHitBytes.add(pieceBytes); stats.getHitBodyBytes.add(pieceBytes); } void PieceWiseCacheStats::recordPieceFullHit( size_t headerBytes, size_t bodyBytes, const std::vector<std::string>& statsAggFields) { recordStats(recordPieceFullHitInternal, statsAggFields, headerBytes, bodyBytes); } void PieceWiseCacheStats::recordPieceFullHitInternal(InternalStats& stats, size_t headerBytes, size_t bodyBytes) { stats.getFullHitBytes.add(headerBytes + bodyBytes); stats.getFullHitBodyBytes.add(bodyBytes); stats.objGetFullHits.inc(); } void PieceWiseCacheStats::recordIngressBytesInternal(InternalStats& stats, size_t ingressBytes) { stats.totalIngressBytes.add(ingressBytes); } void PieceWiseCacheStats::recordIngressBytes( size_t ingressBytes, const std::vector<std::string>& statsAggFields) { recordStats(recordIngressBytesInternal, statsAggFields, ingressBytes); } util::PercentileStats& PieceWiseCacheStats::getLatencyStatsObject() { return reqLatencyStats_; } void PieceWiseCacheStats::renderStats(uint64_t elapsedTimeNs, std::ostream& out) const { out << std::endl << "== PieceWiseReplayGenerator Stats ==" << std::endl; const double elapsedSecs = elapsedTimeNs / static_cast<double>(1e9); // Output the overall stats out << "= Overall stats =" << std::endl; renderStatsInternal(stats_, elapsedSecs, out); // request latency out << "= Request Latency =" << std::endl; folly::StringPiece latCat = "Total Request Latency"; auto fmtLatency = [&](folly::StringPiece cat, folly::StringPiece pct, uint64_t diffNanos) { double diffUs = static_cast<double>(diffNanos) / 1000.0; out << folly::sformat("{:20} {:8} : {:>10.2f} us\n", cat, pct, diffUs); }; auto ret = reqLatencyStats_.estimate(); fmtLatency(latCat, "avg", ret.avg); fmtLatency(latCat, "p50", ret.p50); fmtLatency(latCat, "p90", ret.p90); fmtLatency(latCat, "p99", ret.p99); fmtLatency(latCat, "p999", ret.p999); fmtLatency(latCat, "p9999", ret.p9999); fmtLatency(latCat, "p99999", ret.p99999); fmtLatency(latCat, "p999999", ret.p999999); fmtLatency(latCat, "p100", ret.p100); // Output stats broken down by extra field for (const auto& [fieldNum, fieldValues] : extraStatsIndexM_) { out << "= Breakdown stats for extra field " << fieldNum << " =" << std::endl; for (const auto& [fieldValue, fieldStatIdx] : fieldValues) { out << "Stats for field value " << fieldValue << ": " << std::endl; renderStatsInternal(extraStatsV_[fieldStatIdx], elapsedSecs, out); } } } void PieceWiseCacheStats::renderStats(uint64_t /* elapsedTimeNs */, folly::UserCounters& counters) const { auto ret = reqLatencyStats_.estimate(); counters["request_latency_p99"] = ret.p99; } void PieceWiseCacheStats::renderWindowStats(double elapsedSecs, std::ostream& out) const { auto windowTs = lastWindowStats_.getTimestamps(); out << std::endl << "== PieceWiseReplayGenerator Stats in Recent Time Window (" << windowTs.first << " - " << windowTs.second << ") ==" << std::endl; renderStatsInternal(lastWindowStats_, elapsedSecs, out); lastWindowStats_.reset(); } void PieceWiseCacheStats::renderStatsInternal(const InternalStats& stats, double elapsedSecs, std::ostream& out) { out << folly::sformat("{:10}: {:.2f} million", "Total Processed Samples", stats.objGets.get() / 1e6) << std::endl; auto safeDiv = [](auto nr, auto dr) { return dr == 0 ? 0.0 : 100.0 * nr / dr; }; const double getBytesGB = stats.getBytes.get() / kGB; const double getBytesGBPerSec = getBytesGB / elapsedSecs; const double getBytesSuccessRate = safeDiv(stats.getHitBytes.get(), stats.getBytes.get()); const double getBytesFullSuccessRate = safeDiv(stats.getFullHitBytes.get(), stats.getBytes.get()); const double getBodyBytesGB = stats.getBodyBytes.get() / kGB; const double getBodyBytesGBPerSec = getBodyBytesGB / elapsedSecs; const double getBodyBytesSuccessRate = safeDiv(stats.getHitBodyBytes.get(), stats.getBodyBytes.get()); const double getBodyBytesFullSuccessRate = safeDiv(stats.getFullHitBodyBytes.get(), stats.getBodyBytes.get()); const uint64_t get = stats.objGets.get(); const uint64_t getPerSec = util::narrow_cast<uint64_t>(stats.objGets.get() / elapsedSecs); const double getSuccessRate = safeDiv(stats.objGetHits.get(), stats.objGets.get()); const double getFullSuccessRate = safeDiv(stats.objGetFullHits.get(), stats.objGets.get()); const double egressBytesGB = stats.totalEgressBytes.get() / kGB; const double egressBytesGBPerSec = egressBytesGB / elapsedSecs; const double ingressBytesGB = stats.totalIngressBytes.get() / kGB; const double ingressBytesGBPerSec = ingressBytesGB / elapsedSecs; const double ingressEgressRatio = safeDiv(static_cast<int64_t>(stats.totalEgressBytes.get()) - static_cast<int64_t>(stats.totalIngressBytes.get()), stats.totalEgressBytes.get()); auto outFn = [&out](folly::StringPiece k0, double v0, folly::StringPiece k1, double v1, folly::StringPiece k2, double v2, folly::StringPiece k3, double v3) { out << folly::sformat( "{:12}: {:6.2f} GB, {:18}: {:6.2f} GB/s, {:8}: {:6.2f}%, {:10}: " "{:6.2f}%", k0, v0, k1, v1, k2, v2, k3, v3) << std::endl; }; outFn("getBytes", getBytesGB, "getBytesPerSec", getBytesGBPerSec, "success", getBytesSuccessRate, "full success", getBytesFullSuccessRate); outFn("getBodyBytes", getBodyBytesGB, "getBodyBytesPerSec", getBodyBytesGBPerSec, "success", getBodyBytesSuccessRate, "full success", getBodyBytesFullSuccessRate); out << folly::sformat( "{:12}: {:6.2f} GB, {:12}: {:6.2f} GB, {:18}: {:6.2f} GB/s, " "{:18}: {:6.2f} GB/s, {:8}: {:6.2f}%", "egressBytes", egressBytesGB, "ingressBytes", ingressBytesGB, "egressBytesPerSec", egressBytesGBPerSec, "ingressBytesPerSec", ingressBytesGBPerSec, "ingressEgressratio", ingressEgressRatio) << std::endl; out << folly::sformat( "{:12}: {:10,}, {:18}: {:8,} /s, {:8}: {:6.2f}%, {:10}: {:6.2f}%", "objectGet", get, "objectGetPerSec", getPerSec, "success", getSuccessRate, "full success", getFullSuccessRate) << std::endl; } PieceWiseReqWrapper::PieceWiseReqWrapper( uint64_t cachePieceSize, uint64_t timestamp, uint64_t reqId, OpType opType, folly::StringPiece key, size_t fullContentSize, size_t responseHeaderSize, folly::Optional<uint64_t> rangeStart, folly::Optional<uint64_t> rangeEnd, uint32_t ttl, std::vector<std::string>&& statsAggFieldV, std::unordered_map<std::string, std::string>&& admFeatureM) : baseKey(GenericPieces::escapeCacheKey(key.str())), pieceKey(baseKey), sizes(1), req(pieceKey, sizes.begin(), sizes.end(), opType, ttl, reqId, admFeatureM), requestRange(rangeStart, rangeEnd), headerSize(responseHeaderSize), fullObjectSize(fullContentSize), statsAggFields(statsAggFieldV) { req.timestamp = timestamp; if (fullContentSize < cachePieceSize) { // The entire object is stored along with the response header. // We always fetch the full content first, then trim the // response if it's range request sizes[0] = fullContentSize + responseHeaderSize; } else { // Piecewise caching cachePieces = std::make_unique<GenericPieces>(baseKey, cachePieceSize, kCachePieceGroupSize / cachePieceSize, fullContentSize, &requestRange); // Header piece is the first piece pieceKey = GenericPieces::createPieceHeaderKey(baseKey); sizes[0] = responseHeaderSize; isHeaderPiece = true; } } PieceWiseReqWrapper::PieceWiseReqWrapper(const PieceWiseReqWrapper& other) : baseKey(other.baseKey), pieceKey(other.pieceKey), sizes(other.sizes), req(pieceKey, sizes.begin(), sizes.end(), other.req.getOp(), other.req.ttlSecs, other.req.requestId.value(), other.req.admFeatureMap), requestRange(other.requestRange), isHeaderPiece(other.isHeaderPiece), headerSize(other.headerSize), fullObjectSize(other.fullObjectSize), statsAggFields(other.statsAggFields) { if (other.cachePieces) { cachePieces = std::make_unique<GenericPieces>( baseKey, other.cachePieces->getPieceSize(), kCachePieceGroupSize / other.cachePieces->getPieceSize(), fullObjectSize, &requestRange); cachePieces->setFetchIndex(other.cachePieces->getCurFetchingPieceIndex()); } } void PieceWiseCacheAdapter::recordNewReq(PieceWiseReqWrapper& rw) { // Start tracking request latency rw.latencyTracker_ = std::make_unique<util::LatencyTracker>(stats_.getLatencyStatsObject()); // Record the bytes that we are going to fetch, and to egress. // getBytes and getBodyBytes are what we will fetch from either cache or // upstream. egressBytes are what we will egress to client. size_t getBytes; size_t getBodyBytes; size_t egressBytes; // Calculate getBytes and getBodyBytes. // We always fetch complete piece or object regardless of range boundary if (rw.cachePieces) { // Fetch all relevant pieces, e.g., for range request of 5-150k, we // will fetch 3 pieces (assuming 64k piece): 0-64k, 64-128k, 128-192k getBodyBytes = rw.cachePieces->getTotalSize(); } else { // We fetch the whole object no matter it's range request or not. getBodyBytes = rw.fullObjectSize; } getBytes = getBodyBytes + rw.headerSize; // Calculate egressBytes. if (rw.requestRange.getRequestRange()) { auto rangeStart = rw.requestRange.getRequestRange()->first; auto rangeEnd = rw.requestRange.getRequestRange()->second; auto rangeSize = rangeEnd ? (*rangeEnd - rangeStart + 1) : (rw.fullObjectSize - rangeStart); egressBytes = rangeSize + rw.headerSize; } else { egressBytes = rw.fullObjectSize + rw.headerSize; } stats_.recordAccess(rw.req.timestamp, getBytes, getBodyBytes, egressBytes, rw.statsAggFields); } bool PieceWiseCacheAdapter::processReq(PieceWiseReqWrapper& rw, OpResultType result) { if (rw.cachePieces) { // Object is stored in pieces return updatePieceProcessing(rw, result); } else { // Object is not stored in pieces, and it should be stored along // with the response header return updateNonPieceProcessing(rw, result); } } bool PieceWiseCacheAdapter::updatePieceProcessing(PieceWiseReqWrapper& rw, OpResultType result) { // we are only done if we have got everything. bool done = false; if (result == OpResultType::kGetHit || result == OpResultType::kSetSuccess || result == OpResultType::kSetFailure) { // The piece index we need to fetch next auto nextPieceIndex = rw.cachePieces->getCurFetchingPieceIndex(); // Record the cache hit stats if (result == OpResultType::kGetHit) { if (rw.isHeaderPiece) { stats_.recordPieceHeaderHit(rw.sizes[0], rw.statsAggFields); } else { auto resultPieceIndex = nextPieceIndex - 1; // We always fetch a complete piece. auto pieceSize = rw.cachePieces->getSizeOfAPiece(resultPieceIndex); stats_.recordPieceBodyHit(pieceSize, rw.statsAggFields); } } // For pieces that are beyond pieces number limit (maxCachePieces_), // we don't store them if (rw.cachePieces->isPieceWithinBound(nextPieceIndex) && nextPieceIndex < maxCachePieces_) { // First set the correct key. Header piece has already been fetched, // this is now a body piece. rw.pieceKey = GenericPieces::createPieceKey( rw.baseKey, nextPieceIndex, rw.cachePieces->getPiecesPerGroup()); // Set the size of the piece rw.sizes[0] = rw.cachePieces->getSizeOfAPiece(nextPieceIndex); if (result == OpResultType::kGetHit) { // Fetch next piece rw.req.setOp(OpType::kGet); } else { // Once we start to set a piece, we set all subsequent pieces rw.req.setOp(OpType::kSet); } // Update the piece fetch index rw.isHeaderPiece = false; rw.cachePieces->updateFetchIndex(); } else { if (result == OpResultType::kGetHit) { if (!rw.cachePieces->isPieceWithinBound(nextPieceIndex)) { // We have got all the pieces that are requested, record the full // cache hit stats auto totalSize = rw.cachePieces->getTotalSize(); stats_.recordPieceFullHit(rw.headerSize, totalSize, rw.statsAggFields); } else { // The remaining pieces are beyond maxCachePieces_, we don't store // them in cache and fetch them from upstream directly if (nextPieceIndex >= maxCachePieces_) { stats_.recordIngressBytes( rw.headerSize + rw.cachePieces->getRemainingBytes(), rw.statsAggFields); } } } // we are done done = true; } } else if (result == OpResultType::kGetMiss) { // Record ingress bytes since we will fetch the bytes from upstream. size_t ingressBytes; if (rw.isHeaderPiece) { ingressBytes = rw.headerSize + rw.cachePieces->getRemainingBytes(); } else { // Note we advance the piece index ahead of time, so // getCurFetchingPieceIndex() returns the next piece index auto missPieceIndex = rw.cachePieces->getCurFetchingPieceIndex() - 1; ingressBytes = rw.headerSize + rw.cachePieces->getSizeOfAPiece(missPieceIndex) + rw.cachePieces->getRemainingBytes(); } stats_.recordIngressBytes(ingressBytes, rw.statsAggFields); // Perform set operation next for the current piece rw.req.setOp(OpType::kSet); } else if (result == OpResultType::kSetSkip) { // No need to set subsequent pieces. done = true; } else { XLOG(INFO) << "Unsupported OpResultType: " << (int)result; } return done; } bool PieceWiseCacheAdapter::updateNonPieceProcessing(PieceWiseReqWrapper& rw, OpResultType result) { // we are only done if we got everything. bool done = false; if (result == OpResultType::kGetHit || result == OpResultType::kSetSuccess || result == OpResultType::kSetFailure || result == OpResultType::kSetSkip) { // Record the cache hit stats if (result == OpResultType::kGetHit) { size_t hitBytes = rw.sizes[0]; size_t hitBodyBytes = rw.sizes[0] - rw.headerSize; stats_.recordNonPieceHit(hitBytes, hitBodyBytes, rw.statsAggFields); } // we are done done = true; } else if (result == OpResultType::kGetMiss) { // Record ingress bytes since we will fetch the bytes from upstream. stats_.recordIngressBytes(rw.sizes[0], rw.statsAggFields); // Perform set operation next rw.req.setOp(OpType::kSet); } else { XLOG(INFO) << "Unsupported OpResultType: " << (int)result; } return done; } } // namespace cachebench } // namespace cachelib } // namespace facebook