fb303/ThreadCachedServiceData.cpp (144 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <fb303/ThreadCachedServiceData.h>
#include <folly/Indestructible.h>
#include <folly/Singleton.h>
using std::chrono::milliseconds;
namespace {
static const std::string kFunctionId =
"ThreadCachedStatsMap::aggregateAcrossAllThreads";
}
namespace facebook {
namespace fb303 {
class PublisherManager {
public:
struct Worker {
folly::FunctionScheduler fs_;
Worker() {
fs_.addFunction(
[] { ThreadCachedServiceData::getInternal().publishStats(); },
ThreadCachedServiceData::getInternal().getPublisherInterval(),
kFunctionId);
fs_.setThreadName("servicedata-pub");
fs_.start();
}
};
folly::Synchronized<folly::Optional<Worker>> worker_;
PublisherManager() {
if (ThreadCachedServiceData::getInternal().publishThreadRunning()) {
// Singleton was shutdown. Read parameters from LeakySingleton
// and recreate the publisher thread again.
worker_.wlock()->emplace();
}
}
};
namespace {
folly::Singleton<PublisherManager> publisherManager;
}
// ExportedStatMap will utilize a default stat object,
// MinuteTenMinuteHourTimeSeries, as a blueprint for creating new timeseries
// if one is not explicitly specified. So we define these here that are used
// by their respective wrapper abstractions.
const ExportedStat& MinuteTimeseriesWrapper::templateExportedStat() {
static const folly::Indestructible<MinuteTimeSeries<CounterType>> obj;
return *obj.get();
}
const ExportedStat& QuarterMinuteOnlyTimeseriesWrapper::templateExportedStat() {
static const folly::Indestructible<QuarterMinuteOnlyTimeSeries<CounterType>>
obj;
return *obj.get();
}
const ExportedStat& MinuteOnlyTimeseriesWrapper::templateExportedStat() {
static const folly::Indestructible<MinuteOnlyTimeSeries<CounterType>> obj;
return *obj.get();
}
ThreadCachedServiceData::StatsThreadLocal&
ThreadCachedServiceData::getStatsThreadLocal() {
static folly::Indestructible<ThreadCachedServiceData::StatsThreadLocal>
threadLocal{[]() {
return new ThreadCachedServiceData::ThreadLocalStatsMap{fbData.ptr()};
}};
return *threadLocal;
}
ThreadCachedServiceData& ThreadCachedServiceData::getInternal() {
static ThreadCachedServiceData* instance = new ThreadCachedServiceData();
return *instance;
}
ThreadCachedServiceData* ThreadCachedServiceData::get() {
publisherManager.vivify();
return &getInternal();
}
std::shared_ptr<ThreadCachedServiceData> ThreadCachedServiceData::getShared() {
return std::shared_ptr<ThreadCachedServiceData>(
std::shared_ptr<void>{}, ThreadCachedServiceData::get());
}
ThreadCachedServiceData::ThreadCachedServiceData()
: serviceData_{fbData.ptr()},
threadLocalStats_{&ThreadCachedServiceData::getStatsThreadLocal()} {}
void ThreadCachedServiceData::publishStats() {
for (ThreadLocalStatsMap& tlsm : threadLocalStats_->accessAllThreads()) {
tlsm.aggregate();
}
}
void ThreadCachedServiceData::startPublishThread(milliseconds interval) {
// startPublishThread(-1) is intended to be used by libraries that use
// ThreadCachedServiceData and want to ensure that the publish thread are
// running. (Since they don't control the main function and therefore don't
// know if it was started by the main program.)
// If the interval isn't positive, perform a fast check to see if the thread
// has already been started.
if (interval <= milliseconds(0) &&
interval_.load(std::memory_order_relaxed) != milliseconds(0)) {
return;
}
if (interval <= milliseconds(0)) {
// default to 1 second if the caller passed in a non-positive value.
interval = milliseconds(1000);
}
if (auto mgr = publisherManager.try_get()) {
mgr->worker_.withWLock([&](auto& worker) {
interval_.store(interval, std::memory_order_relaxed);
worker.emplace();
});
}
}
void ThreadCachedServiceData::stopPublishThread() {
if (auto mgr = publisherManager.try_get()) {
mgr->worker_.withWLock([&](auto& worker) {
interval_.store(milliseconds(0), std::memory_order_relaxed);
worker.reset();
});
}
}
bool ThreadCachedServiceData::publishThreadRunning() const {
return getPublisherInterval() > milliseconds(0);
}
int64_t ThreadCachedServiceData::setCounter(
folly::StringPiece key,
int64_t value) {
// Data set via setCounter() is implicitly not cachable in each thread:
// the last call to setCounter() always has to win. Therefore, set the
// ServiceData counter directly rather than using the threadLocalStats_.
//
// In general, callers who use this API are not performance sensitive: almost
// all of the call sites are callers who use it on rare occasions to reset
// the counter to 0, or callers who maintain their own stat counter
// internally and then periodically call setCounter() once every several
// seconds to replace the exported ServiceData counter with their up-to-date
// internal version.
return getServiceData()->setCounter(key, value);
}
void ThreadCachedServiceData::clearCounter(folly::StringPiece key) {
// As with setCounter(), clearCounter() is not easily cachable on a
// per-thread basis, and is similarly non performance sensitive.
getServiceData()->clearCounter(key);
}
void ThreadCachedServiceData::zeroStats() {
// Call publishStats() to clear out all thread-cached data
publishStats();
// Then zero out the ServiceData stats
getServiceData()->zeroStats();
}
void ThreadCachedServiceData::addHistAndStatValue(
folly::StringPiece key,
int64_t value,
bool /*checkContains*/) {
getThreadStats()->addStatValue(key, value);
getThreadStats()->addHistogramValue(key, value);
}
void ThreadCachedServiceData::addHistAndStatValues(
folly::StringPiece key,
const folly::Histogram<int64_t>& values,
time_t now,
int64_t sum,
int64_t nsamples,
bool checkContains) {
getServiceData()->addHistAndStatValues(
key, values, now, sum, nsamples, checkContains);
}
void ThreadCachedServiceData::addStatValue(
folly::StringPiece key,
int64_t value,
ExportType exportType) {
using KeyCacheTable =
std::array<ExportKeyCache, ExportTypeMeta::kNumExportTypes>;
static folly::ThreadLocal<KeyCacheTable> keyCacheTable;
if (UNLIKELY(!(*keyCacheTable)[exportType].has(key))) {
// This is not present in the threadlocal export set; possible it was
// not yet registered with the underlying ServiceData impl.
// This time around, pass it to the ServiceData so the type is exported
getServiceData()->addStatExportType(key, exportType);
(*keyCacheTable)[exportType].add(key);
}
// now we know the export was done; finally bump the counter
addStatValue(key, value);
}
} // namespace fb303
} // namespace facebook