cachelib/cachebench/workload/PieceWiseReplayGenerator.cpp (173 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cachelib/cachebench/workload/PieceWiseReplayGenerator.h"
#include "cachelib/cachebench/util/Exceptions.h"
#include "folly/String.h"
namespace {
constexpr uint32_t kProducerConsumerWaitTimeUs = 5;
} // namespace
namespace facebook {
namespace cachelib {
namespace cachebench {
const Request& PieceWiseReplayGenerator::getReq(
uint8_t, std::mt19937_64&, std::optional<uint64_t> lastRequestId) {
auto& activeReqQ = getTLReqQueue();
// Spin until the queue has a value
while (activeReqQ.isEmpty()) {
if (isEndOfFile_.load(std::memory_order_relaxed) || shouldShutdown()) {
throw cachelib::cachebench::EndOfTrace("");
} else {
// Wait a while to allow traceGenThread_ to process new samples.
queueConsumerWaitCounts_.inc();
std::this_thread::sleep_for(
std::chrono::microseconds(kProducerConsumerWaitTimeUs));
}
}
auto reqWrapper = activeReqQ.frontPtr();
bool isNewReq = true;
if (lastRequestId) {
XCHECK_LE(*lastRequestId, reqWrapper->req.requestId.value());
if (*lastRequestId == reqWrapper->req.requestId.value()) {
isNewReq = false;
}
}
// Record the byte wise and object wise stats that we will fetch
// when it's a new request
if (isNewReq) {
pieceCacheAdapter_.recordNewReq(*reqWrapper);
}
return reqWrapper->req;
}
void PieceWiseReplayGenerator::notifyResult(uint64_t requestId,
OpResultType result) {
auto& activeReqQ = getTLReqQueue();
auto& rw = *(activeReqQ.frontPtr());
XCHECK_EQ(rw.req.requestId.value(), requestId);
auto done = pieceCacheAdapter_.processReq(rw, result);
if (done) {
activeReqQ.popFront();
}
}
void PieceWiseReplayGenerator::getReqFromTrace() {
std::string line;
auto partialFieldCount = SampleFields::TOTAL_DEFINED_FIELDS +
config_.replayGeneratorConfig.numAggregationFields;
auto totalFieldCount =
partialFieldCount + config_.replayGeneratorConfig.numExtraFields;
while (true) {
if (!std::getline(infile_, line)) {
if (repeatTraceReplay_) {
XLOG_EVERY_MS(
INFO, 100'000,
"Reached the end of trace file. Restarting from beginning.");
resetTraceFileToBeginning();
continue;
}
isEndOfFile_.store(true, std::memory_order_relaxed);
break;
}
samples_.inc();
try {
std::vector<folly::StringPiece> fields;
folly::split(",", line, fields);
if (fields.size() != totalFieldCount) {
invalidSamples_.inc();
continue;
}
auto shard = getShard(fields[SampleFields::CACHE_KEY]);
if (threadFinished_[shard].load(std::memory_order_relaxed)) {
if (shouldShutdown()) {
XLOG(INFO) << "Forced to stop, terminate reading trace file!";
return;
}
XLOG_EVERY_MS(INFO, 100'000,
folly::sformat("Thread {} finish, skip", shard));
continue;
}
auto fullContentSizeT =
folly::tryTo<size_t>(fields[SampleFields::OBJECT_SIZE]);
auto responseSizeT =
folly::tryTo<size_t>(fields[SampleFields::RESPONSE_SIZE]);
auto responseHeaderSizeT =
folly::tryTo<size_t>(fields[SampleFields::RESPONSE_HEADER_SIZE]);
auto ttlT = folly::tryTo<uint32_t>(fields[SampleFields::TTL]);
// Invalid sample: cacheKey is empty, objectSize is not positive,
// responseSize is not positive, responseHeaderSize is not positive,
// ttl is not positive
if (!fields[1].compare("-") || !fields[1].compare("") ||
!fullContentSizeT.hasValue() || fullContentSizeT.value() == 0 ||
!responseSizeT.hasValue() || responseSizeT.value() == 0 ||
!responseHeaderSizeT.hasValue() || responseHeaderSizeT.value() == 0 ||
!ttlT.hasValue() || ttlT.value() == 0) {
invalidSamples_.inc();
continue;
}
// Convert timestamp to seconds.
uint64_t timestampRaw =
folly::tryTo<size_t>(fields[SampleFields::TIMESTAMP]).value();
uint64_t timestampSeconds = timestampRaw / timestampFactor_;
auto fullContentSize = fullContentSizeT.value();
auto responseSize = responseSizeT.value();
auto responseHeaderSize = responseHeaderSizeT.value();
auto ttl = ttlT.value();
// When responseSize and responseHeaderSize is equal, responseBodySize
// becomes 0 which can make range calculation incorrect. Simply ignore
// such requests for now.
// TODO: better handling non-GET requests
if (responseSize == responseHeaderSize) {
nonGetSamples_.inc();
continue;
}
auto parseRangeField = [](folly::StringPiece range, size_t contentSize) {
folly::Optional<uint64_t> result;
// Negative value means it's not range request
auto val = folly::to<int64_t>(range);
if (val >= 0) {
// range index can not be larger than content size
result = std::min(static_cast<size_t>(val), contentSize - 1);
} else {
result = folly::none;
}
return result;
};
auto rangeStart =
parseRangeField(fields[SampleFields::RANGE_START], fullContentSize);
auto rangeEnd =
parseRangeField(fields[SampleFields::RANGE_END], fullContentSize);
// Perform range size check, and rectify the range when responseBodySize
// is obviously too small.
auto responseBodySize = responseSize - responseHeaderSize;
if (!rangeStart.has_value()) {
// No range request setting, but responseBodySize is smaller than
// fullContentSize. Convert the sample to range request.
if (responseBodySize < fullContentSize) {
rangeStart = 0;
rangeEnd = responseBodySize - 1;
}
} else {
// The sample is range request, but range size is larger than
// responseBodySize. Rectify the range end.
size_t rangeSize = rangeEnd ? (*rangeEnd - *rangeStart + 1)
: (fullContentSize - *rangeStart);
if (responseBodySize < rangeSize) {
rangeEnd = responseBodySize + *rangeStart - 1;
}
}
std::vector<std::string> statsAggFields;
for (size_t i = SampleFields::TOTAL_DEFINED_FIELDS; i < partialFieldCount;
++i) {
statsAggFields.push_back(fields[i].str());
}
// Admission policy related fields: feature name --> feature value
std::unordered_map<std::string, std::string> admFeatureMap;
if (config_.replayGeneratorConfig.mlAdmissionConfig) {
for (const auto& [featureName, index] :
config_.replayGeneratorConfig.mlAdmissionConfig->numericFeatures) {
XCHECK_LT(index, totalFieldCount);
admFeatureMap.emplace(featureName, fields[index].str());
}
for (const auto& [featureName, index] :
config_.replayGeneratorConfig.mlAdmissionConfig
->categoricalFeatures) {
XCHECK_LT(index, totalFieldCount);
admFeatureMap.emplace(featureName, fields[index].str());
}
}
// Spin until the queue has room
while (!activeReqQ_[shard]->write(config_.cachePieceSize,
timestampSeconds,
nextReqId_,
OpType::kGet, // Only support get from
// trace for now
fields[SampleFields::CACHE_KEY],
fullContentSize,
responseHeaderSize,
rangeStart,
rangeEnd,
ttl,
std::move(statsAggFields),
std::move(admFeatureMap))) {
if (shouldShutdown()) {
XLOG(INFO) << "Forced to stop, terminate reading trace file!";
return;
}
queueProducerWaitCounts_.inc();
std::this_thread::sleep_for(
std::chrono::microseconds(kProducerConsumerWaitTimeUs));
if (threadFinished_[shard].load(std::memory_order_relaxed)) {
XLOG_EVERY_MS(INFO, 100'000,
folly::sformat("Thread {} finish, skip", shard));
break;
}
}
++nextReqId_;
} catch (const std::exception& e) {
XLOG(ERR) << "Processing line: " << line
<< ", causes exception: " << e.what();
}
}
}
uint32_t PieceWiseReplayGenerator::getShard(folly::StringPiece key) {
if (mode_ == ReplayGeneratorConfig::SerializeMode::strict) {
return folly::hash::SpookyHashV2::Hash32(key.begin(), key.size(), 0) %
numShards_;
} else {
// TODO: implement the relaxed mode
return folly::Random::rand32(numShards_);
}
}
} // namespace cachebench
} // namespace cachelib
} // namespace facebook