cachelib/cachebench/workload/PieceWiseCache.h (167 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Benchmark.h>
#include <folly/Optional.h>
#include <folly/Range.h>
#include <mutex>
#include "cachelib/cachebench/util/Request.h"
#include "cachelib/common/AtomicCounter.h"
#include "cachelib/common/PercentileStats.h"
#include "cachelib/common/piecewise/GenericPieces.h"
namespace facebook {
namespace cachelib {
namespace cachebench {
// physical grouping size (in Bytes) for contiguous pieces
constexpr uint64_t kCachePieceGroupSize = 16777216;
class PieceWiseCacheStats {
public:
struct InternalStats {
// Byte wise stats:
// getBytes: record both body and header bytes we fetch from cache and
// upstream;
// getHitBytes: record both body and header bytes we fetch from cache;
// getFullHitBytes: similar to getHitBytes, but only if all bytes for the
// request are cache hits, i.e., excluding partial hits
// getBodyBytes: similar to getBytes, but only for body bytes
// getHitBodyBytes: similar to getHitBytes, but only for body bytes
// getFullHitBodyBytes: similar to getFullHitBytes, but only for body bytes
//
// totalIngressBytes: record both body and header bytes we fetch from
// upstream. Note we fetch both the header (which might
// have been a hit) and the remaining part of missing
// pieces
// totalEgressBytes: record both body and header bytes we send out to
// downstream. Note we trim the bytes fetched from cache
// and/or upstream to for egress match request range
//
// For example, for range request of 0-50k (assuming 64k piece size), we
// will fetch the first complete piece, but egress only 0-50k bytes; so
// getBytes = 64k+header, getBodyBytes = 64k, totalEgressBytes = 50001
AtomicCounter getBytes{0};
AtomicCounter getHitBytes{0};
AtomicCounter getFullHitBytes{0};
AtomicCounter getBodyBytes{0};
AtomicCounter getHitBodyBytes{0};
AtomicCounter getFullHitBodyBytes{0};
AtomicCounter totalIngressBytes{0};
AtomicCounter totalEgressBytes{0};
// Object wise stats: for an object get, objGetFullHits is incremented when
// all pieces are cache hits, while objGetHits is incremented for partial
// hits as well.
AtomicCounter objGets{0};
AtomicCounter objGetHits{0};
AtomicCounter objGetFullHits{0};
void reset() {
getBytes.set(0);
getHitBytes.set(0);
getFullHitBytes.set(0);
getBodyBytes.set(0);
getHitBodyBytes.set(0);
getFullHitBodyBytes.set(0);
totalIngressBytes.set(0);
totalEgressBytes.set(0);
objGets.set(0);
objGetHits.set(0);
objGetFullHits.set(0);
{
std::lock_guard<std::mutex> lck(tsMutex_);
startTimestamp_ = 0;
endTimestamp_ = 0;
}
}
void updateTimestamp(uint64_t timestamp);
std::pair<uint64_t, uint64_t> getTimestamps();
private:
// The starting time and end time for counting the stats.
std::mutex tsMutex_;
uint64_t startTimestamp_{0};
uint64_t endTimestamp_{0};
};
// @param numAggregationFields: # of aggregation fields in trace sample
// @param statsPerAggField: list of values we track for each aggregation field
// Example: numAggregationFields: 2; statsPerAggField: {0: [X, Y]; 1: [Z]}
// for these inputs, we track the aggregated stats for samples that have value
// X for aggregation field 0, stats for samples that have value Y for field
// 0, and stats for samples that have value Z for field 1
explicit PieceWiseCacheStats(
uint32_t numAggregationFields,
const std::unordered_map<uint32_t, std::vector<std::string>>&
statsPerAggField);
// Record both byte-wise and object-wise stats for a get request access
void recordAccess(uint64_t timestamp,
size_t getBytes,
size_t getBodyBytes,
size_t bytesEgress,
const std::vector<std::string>& statsAggFields);
// Record cache hit for objects that are not stored in pieces
void recordNonPieceHit(size_t hitBytes,
size_t hitBodyBytes,
const std::vector<std::string>& statsAggFields);
// Record cache hit for a header piece
void recordPieceHeaderHit(size_t pieceBytes,
const std::vector<std::string>& statsAggFields);
// Record cache hit for a body piece
void recordPieceBodyHit(size_t pieceBytes,
const std::vector<std::string>& statsAggFields);
// Record full hit stats for piece wise caching
void recordPieceFullHit(size_t headerBytes,
size_t bodyBytes,
const std::vector<std::string>& statsAggFields);
// Record bytes ingress from upstream
void recordIngressBytes(size_t ingressBytes,
const std::vector<std::string>& statsAggFields);
util::PercentileStats& getLatencyStatsObject();
void renderStats(uint64_t elapsedTimeNs, std::ostream& out) const;
void renderStats(uint64_t /* elapsedTimeNs */,
folly::UserCounters& counters) const;
const InternalStats& getInternalStats() const { return stats_; }
void renderWindowStats(double elapsedSecs, std::ostream& out) const;
private:
// Overall hit rate stats
InternalStats stats_;
// The stats for the data since the last time we rendered.
mutable InternalStats lastWindowStats_;
// Collect breakdown stats for following map. The keys of these maps are added
// in constructor and never changed afterwards, so no map structure change
// after construction, and we can use const method from multi-threads without
// lock
//
// extraStatsIndexM_: extra field index --> field value to track --> index
// into extraStatsV_
std::map<uint32_t, std::map<std::string, uint32_t>> extraStatsIndexM_;
std::vector<InternalStats> extraStatsV_;
// Latency stats
mutable util::PercentileStats reqLatencyStats_;
template <typename F, typename... Args>
void recordStats(F& func,
const std::vector<std::string>& fields,
Args... args) {
func(stats_, args...);
func(lastWindowStats_, args...);
for (const auto& [fieldNum, statM] : extraStatsIndexM_) {
XDCHECK_LT(fieldNum, fields.size());
auto it = statM.find(fields[fieldNum]);
if (it != statM.end()) {
func(extraStatsV_[it->second], args...);
}
}
}
static void recordAccessInternal(InternalStats& stats,
size_t getBytes,
size_t getBodyBytes,
size_t egressBytes);
static void recordNonPieceHitInternal(InternalStats& stats,
size_t hitBytes,
size_t hitBodyBytes);
static void recordPieceHeaderHitInternal(InternalStats& stats,
size_t pieceBytes);
static void recordPieceBodyHitInternal(InternalStats& stats,
size_t pieceBytes);
static void recordPieceFullHitInternal(InternalStats& stats,
size_t headerBytes,
size_t bodyBytes);
static void recordIngressBytesInternal(InternalStats& stats,
size_t ingressBytes);
static void renderStatsInternal(const InternalStats& stats,
double elapsedSecs,
std::ostream& out);
};
// For piecewise caching, an object is stored as multiple pieces (one header
// piece + several body pieces) if its size is larger than cachePieceSize;
// otherwise, it's stored as a single piece.
// Therefore, a raw request spawns into one or multiple piece requests against
// the cache.
// The class wraps the struct Request (which represents a single request against
// cache), and maintains the raw request (identified by baseKey) with its piece
// request (identified by pieceKey).
struct PieceWiseReqWrapper {
const std::string baseKey;
std::string pieceKey;
std::vector<size_t> sizes;
// Its internal key is a reference to pieceKey.
// Immutable except the op field
Request req;
std::unique_ptr<GenericPieces> cachePieces;
const RequestRange requestRange;
// whether current pieceKey is header piece or body piece, mutable
bool isHeaderPiece{false};
// response header size
const size_t headerSize;
// The size of the complete object, excluding response header.
const size_t fullObjectSize;
// Additional stats aggregation fields for this request sample other
// than the defined SampleFields
const std::vector<std::string> statsAggFields;
// Tracker to record the start/end of request. Initialize it at the
// start of request processing.
std::unique_ptr<util::LatencyTracker> latencyTracker_;
// @param cachePieceSize: byte size of a cache piece
// @param reqId: the unique request id for the raw request
// @param key: base key for the raw request
// @param fullContentSize: byte size of the full content/object
// @param responseHeaderSize: response header size for the request
// @param rangeStart: start of the range if it's range request
// @param rangeEnd: end of the range if it's range request
// @param ttl: ttl of the content for caching
// @param statsAggFieldV: extra fields used for stats aggregation
// @param admFeatureM: features map for admission policy
explicit PieceWiseReqWrapper(
uint64_t cachePieceSize,
uint64_t timestamp,
uint64_t reqId,
OpType opType,
folly::StringPiece key,
size_t fullContentSize,
size_t responseHeaderSize,
folly::Optional<uint64_t> rangeStart,
folly::Optional<uint64_t> rangeEnd,
uint32_t ttl,
std::vector<std::string>&& statsAggFieldV,
std::unordered_map<std::string, std::string>&& admFeatureM);
PieceWiseReqWrapper(const PieceWiseReqWrapper& other);
};
// The class adapts/updates the PieceWiseReqWrapper to its next operation and/or
// next piece based on the piecewise logic, and maintains PieceWiseCacheStats
// for all processings.
// For piecewise caching, we always fetch/store a complete piece from/to
// the cache, and then trim the fetched pieces accordingly for egress to match
// the request range.
class PieceWiseCacheAdapter {
public:
// @param maxCachePieces: maximum # of cache pieces we store for a single
// content
// @param numAggregationFields: # of aggregation fields in trace sample
// @param statsPerAggField: list of values we track for each aggregation field
explicit PieceWiseCacheAdapter(
uint64_t maxCachePieces,
uint32_t numAggregationFields,
const std::unordered_map<uint32_t, std::vector<std::string>>&
statsPerAggField)
: stats_(numAggregationFields, statsPerAggField),
maxCachePieces_(maxCachePieces) {}
// Record corresponding stats for a new request.
void recordNewReq(PieceWiseReqWrapper& rw);
// Process the request rw, which updates rw to its next operation, and
// record stats accordingly.
// @return true if all ops for the request rw are done.
bool processReq(PieceWiseReqWrapper& rw, OpResultType result);
const PieceWiseCacheStats& getStats() const { return stats_; }
private:
// Called when rw is piecewise cached. The method updates rw to the next
// operation and/or next piece based on piece logic, and record stats.
// @return true if all the ops for all pieces behind the request are done.
bool updatePieceProcessing(PieceWiseReqWrapper& rw, OpResultType result);
// Called when rw is not piecewise cached. The method updates rw to the
// next operation, and record the stats.
// @return true if all ops for the request are done.
bool updateNonPieceProcessing(PieceWiseReqWrapper& rw, OpResultType result);
PieceWiseCacheStats stats_;
const uint64_t maxCachePieces_;
};
} // namespace cachebench
} // namespace cachelib
} // namespace facebook