src/common/monitor/Recorder.cc (290 lines of code) (raw):
#include "common/monitor/Recorder.h"
#include <algorithm>
#include <folly/Likely.h>
#include <folly/logging/xlog.h>
#include <optional>
#include <string>
#include <string_view>
#include <utility>
#include "common/monitor/Monitor.h"
#include "common/monitor/Sample.h"
namespace hf3fs::monitor {
using namespace std::chrono_literals;
static std::string gHostname{0};
static std::string gPodname{0};
thread_local std::array<String, 65536> errorCodeStrings;
thread_local std::array<std::optional<TagSet>, 65536> errorCodeTagSets;
Recorder::~Recorder() {
XLOGF_IF(DFATAL,
register_,
"Recorder forgot to unregister self with name {} and tags {}",
name_,
serde::toJsonString(tag_.asMap()));
}
void Recorder::collectAndClean(std::vector<Sample> &samples, bool cleanInactive) {
collect(samples);
auto iter = map_.begin();
while (iter != map_.end()) {
auto &recorder = iter->second;
recorder->collect(samples);
if (cleanInactive && !std::exchange(recorder->active_, false)) {
XLOGF(DBG, "Remove inactive recorder {}", recorder->name());
iter = map_.erase(iter);
} else {
++iter;
}
}
}
void Recorder::setHostname(std::string hostname, std::string podname) {
gHostname = std::move(hostname);
gPodname = std::move(podname);
}
template <typename T>
Recorder::TagRef<T> Recorder::getRecorderWithTag(const TagSet &tag) {
auto iter = map_.find(tag);
if (UNLIKELY(iter == map_.end())) {
TagSet new_tag_set = tag_;
for (const auto &kv : tag) {
new_tag_set.addTag(kv.first, kv.second);
}
auto newRecorder = std::make_unique<T>(*reinterpret_cast<T *>(this), new_tag_set);
auto result = map_.insert(tag, std::move(newRecorder));
iter = std::move(result.first);
}
return iter;
}
template <class ThreadLocalTag>
CountRecorderWithTLSTag<ThreadLocalTag>::CountRecorderWithTLSTag(std::string_view name,
std::optional<TagSet> tag,
bool resetWhenCollect)
: CountRecorderWithTLSTag(Monitor::getDefaultInstance(), name, tag, resetWhenCollect) {}
template <class ThreadLocalTag>
void CountRecorderWithTLSTag<ThreadLocalTag>::addSample(int64_t val, const TagSet &tag) {
getRecorderWithTag(tag)->addSample(val);
}
template <class ThreadLocalTag>
void CountRecorderWithTLSTag<ThreadLocalTag>::collect(std::vector<Sample> &samples) {
// 1. collect for last second.
int64_t sum = resetWhenCollect_ ? sum_.exchange(0) : sum_.load();
for (auto &tls : tls_.accessAllThreads()) {
sum += resetWhenCollect_ ? tls.exchange() : tls.load();
}
auto now = UtcClock::now();
if (sum) {
TagSet sample_tag = tag_;
sample_tag.addTag("host", gHostname);
sample_tag.addTag("pod", gPodname);
samples.emplace_back(Sample{name_, std::move(sample_tag), now, sum});
cumulativeVal_ += sum;
active_ = true;
}
// 2. log per 30 seconds.
if (resetWhenCollect_ && logPer30s_ && now - lastLogTime_ >= 30s) {
if (cumulativeVal_ > 0) {
XLOGF(DBG3, "Last 30s {}: {}", name_, cumulativeVal_);
cumulativeVal_ = 0;
}
lastLogTime_ = now;
}
}
template <class ThreadLocalTag>
void CountRecorderWithTLSTag<ThreadLocalTag>::collectAndClean(std::vector<Sample> &samples, bool cleanInactive) {
Recorder::collectAndClean(samples, cleanInactive);
}
template <>
void CountRecorderWithTLSTag<AllocatedMemoryCounterTag>::collectAndClean(std::vector<Sample> &samples, bool) {
Recorder::collectAndClean(samples, false /*cleanInactive*/);
}
template class CountRecorderWithTLSTag<SharedThreadLocalTag>;
template class CountRecorderWithTLSTag<AllocatedMemoryCounterTag>;
DistributionRecorder::DistributionRecorder(std::string_view name, std::optional<TagSet> tag)
: DistributionRecorder(Monitor::getDefaultInstance(), name, tag) {}
void DistributionRecorder::collect(std::vector<Sample> &samples) {
// 1. collect for last second.
auto now = UtcClock::now();
auto digest = tdigest_.build();
if (!digest.empty()) {
Distribution dist;
dist.cnt = digest.count();
dist.sum = digest.sum();
dist.min = digest.min();
dist.max = digest.max();
dist.p50 = digest.estimateQuantile(0.50);
dist.p90 = digest.estimateQuantile(0.90);
dist.p95 = digest.estimateQuantile(0.95);
dist.p99 = digest.estimateQuantile(0.99);
samples.emplace_back();
samples.back().name = name_;
TagSet sample_tag = tag_;
sample_tag.addTag("host", gHostname);
sample_tag.addTag("pod", gPodname);
samples.back().tags = std::move(sample_tag);
samples.back().timestamp = now;
samples.back().value = dist;
active_ = true;
if (logPer30s_) {
cumulativeDigests_.push_back(std::move(digest));
}
}
// 2. log per 30 seconds.
if (logPer30s_ && now - lastLogTime_ >= 30s) {
if (!cumulativeDigests_.empty()) {
logPer30s(folly::TDigest::merge(folly::Range(&cumulativeDigests_[0], cumulativeDigests_.size())));
cumulativeDigests_.clear();
}
lastLogTime_ = now;
}
}
void DistributionRecorder::addSample(double val, const TagSet &tag) { getRecorderWithTag(tag)->addSample(val); }
void DistributionRecorder::logPer30s(const folly::TDigest &digest) {
XLOGF(DBG3,
"Last 30s {}: count:{} mean:{:.1f} min:{:.1f} max:{:.1f} p50:{:.1f} p90:{:.1f} p95:{:.1f} p99:{:.1f}",
name_,
uint64_t(digest.count()),
digest.mean(),
digest.min(),
digest.max(),
digest.estimateQuantile(0.50),
digest.estimateQuantile(0.90),
digest.estimateQuantile(0.95),
digest.estimateQuantile(0.99));
}
LatencyRecorder::LatencyRecorder(std::string_view name, std::optional<TagSet> tag)
: DistributionRecorder(Monitor::getDefaultInstance(), name, tag, false) {
registerRecorder();
}
LatencyRecorder::LatencyRecorder(LatencyRecorder &parent, const TagSet &tag)
: DistributionRecorder(parent.monitor_, parent.name(), tag, false) {}
SimpleDistributionRecorder::SimpleDistributionRecorder(std::string_view name, std::optional<TagSet> tag)
: SimpleDistributionRecorder(Monitor::getDefaultInstance(), name, tag) {}
void SimpleDistributionRecorder::collect(std::vector<Sample> &samples) {
int64_t sum = sum_.exchange(0, std::memory_order_relaxed);
int64_t count = count_.exchange(0, std::memory_order_relaxed);
int64_t minval = min_.exchange(std::numeric_limits<int64_t>::max(), std::memory_order_relaxed);
int64_t maxval = max_.exchange(0, std::memory_order_relaxed);
for (auto &tls : tls_.accessAllThreads()) {
sum += tls.exchangeSum();
count += tls.exchangeCount();
minval = std::min(minval, tls.exchangeMin());
maxval = std::max(maxval, tls.exchangeMax());
}
auto now = UtcClock::now();
if (count != 0 && sum != 0) {
Distribution dist;
dist.cnt = count;
dist.sum = sum;
if (minval != std::numeric_limits<int64_t>::max()) {
dist.min = minval;
}
dist.max = maxval;
samples.emplace_back();
samples.back().name = name_;
TagSet sample_tag = tag_;
sample_tag.addTag("host", gHostname);
sample_tag.addTag("pod", gPodname);
samples.back().tags = std::move(sample_tag);
samples.back().timestamp = now;
samples.back().value = dist;
active_ = true;
}
}
void SimpleDistributionRecorder::addSample(int64_t val, const TagSet &tag) { getRecorderWithTag(tag)->addSample(val); }
void LatencyRecorder::addSample(std::chrono::nanoseconds duration, const TagSet &tag) {
getRecorderWithTag(tag)->addSample(duration);
}
void LatencyRecorder::logPer30s(const folly::TDigest &digest) {
XLOGF(DBG3,
"Last 30s {}: count:{} mean:{}us min:{}us max:{}us p50:{}us p90:{}us p95:{}us p99:{}us",
name_,
uint64_t(digest.count()),
uint64_t(digest.mean()) / 1000,
uint64_t(digest.min()) / 1000,
uint64_t(digest.max()) / 1000,
uint64_t(digest.estimateQuantile(0.50)) / 1000,
uint64_t(digest.estimateQuantile(0.90)) / 1000,
uint64_t(digest.estimateQuantile(0.95)) / 1000,
uint64_t(digest.estimateQuantile(0.99)) / 1000);
}
template <typename LatencyRecorderT>
OperationRecorderT<LatencyRecorderT>::OperationRecorderT(std::string_view name,
std::optional<TagSet> tag,
bool recordErrorCode)
: total_(fmt::format("{}.total", name), tag),
fails_(fmt::format("{}.fails", name), tag),
current_(fmt::format("{}.current", name), tag, /*resetWhenCollect=*/false),
succ_latencies_(fmt::format("{}.succ_latency", name), tag),
fail_latencies_(fmt::format("{}.fail_latency", name), tag),
recordErrorCode_(recordErrorCode) {}
template <typename LatencyRecorderT>
void OperationRecorderT<LatencyRecorderT>::Guard::reportWithCode(status_code_t code) {
if (reported_) {
return;
}
success_ |= (code == StatusCode::kOK);
auto latency = RelativeTime::now() - startTime_;
if (tags_.has_value()) {
recorder_.current_.addSample(-1, *tags_);
if (LIKELY(success_)) {
recorder_.succ_latencies_.addSample(latency, *tags_);
} else {
if (recorder_.recordErrorCode_) {
auto &errorCodeStr = errorCodeStrings[code];
if (errorCodeStr.empty()) {
errorCodeStr = StatusCode::toString(code);
}
recorder_.fails_.addSample(1, tags_->newTagSet("statusCode", errorCodeStr));
} else {
recorder_.fails_.addSample(1, *tags_);
}
recorder_.fail_latencies_.addSample(latency, *tags_);
}
} else {
recorder_.current_.addSample(-1);
if (LIKELY(success_)) {
recorder_.succ_latencies_.addSample(latency);
} else {
if (recorder_.recordErrorCode_) {
auto &tagset = errorCodeTagSets[code];
if (!tagset) {
tagset = TagSet::create("statusCode", String(StatusCode::toString(code)));
}
recorder_.fails_.addSample(1, *tagset);
} else {
recorder_.fails_.addSample(1);
}
recorder_.fail_latencies_.addSample(latency);
}
}
reported_ = true;
}
template class OperationRecorderT<LatencyRecorder>;
template class OperationRecorderT<SimpleDistributionRecorder>;
ValueRecorder::ValueRecorder(std::string_view name, std::optional<TagSet> tag, bool resetWhenCollect)
: ValueRecorder(Monitor::getDefaultInstance(), name, tag, resetWhenCollect) {}
void ValueRecorder::set(int64_t val, const TagSet &tag) { getRecorderWithTag(tag)->set(val); }
void ValueRecorder::collect(std::vector<Sample> &samples) {
int64_t val = resetWhenCollect_ ? val_.exchange(0) : val_.load();
if (val > 0) {
auto now = UtcClock::now();
TagSet sample_tag = tag_;
sample_tag.addTag("host", gHostname);
sample_tag.addTag("pod", gPodname);
samples.emplace_back(Sample{name_, std::move(sample_tag), now, val});
active_ = true;
}
}
void ValueRecorder::collectAndClean(std::vector<Sample> &samples, bool cleanInactive) {
Recorder::collectAndClean(samples, cleanInactive);
}
LambdaRecorder::LambdaRecorder(std::string_view name, std::optional<TagSet> tag /* = std::nullopt */)
: Recorder(name, tag, Monitor::getDefaultInstance()) {
registerRecorder();
}
void LambdaRecorder::collect(std::vector<Sample> &samples) {
auto lock = std::unique_lock(mutex_);
auto value = getter_();
lock.unlock();
if (value) {
TagSet sampleTag = tag_;
sampleTag.addTag("host", gHostname);
sampleTag.addTag("pod", gPodname);
samples.emplace_back(Sample{name_, std::move(sampleTag), UtcClock::now(), value});
}
}
} // namespace hf3fs::monitor