core/prometheus/schedulers/TargetSubscriberScheduler.cpp (405 lines of code) (raw):

/* * Copyright 2024 iLogtail Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "prometheus/schedulers/TargetSubscriberScheduler.h" #include <cstdlib> #include <chrono> #include <memory> #include <string> #include "AppConfig.h" #include "SelfMonitorMetricEvent.h" #include "common/JsonUtil.h" #include "common/StringTools.h" #include "common/TimeUtil.h" #include "common/http/Constant.h" #include "common/timer/HttpRequestTimerEvent.h" #include "logger/Logger.h" #include "monitor/Monitor.h" #include "monitor/metric_constants/MetricConstants.h" #include "prometheus/Constants.h" #include "prometheus/Utils.h" #include "prometheus/async/PromFuture.h" #include "prometheus/async/PromHttpRequest.h" #include "prometheus/schedulers/ScrapeScheduler.h" using namespace std; namespace logtail { std::chrono::steady_clock::time_point TargetSubscriberScheduler::mLastUpdateTime = std::chrono::steady_clock::now(); uint64_t TargetSubscriberScheduler::sDelaySeconds = 0; TargetSubscriberScheduler::TargetSubscriberScheduler() : mQueueKey(0), mInputIndex(0), mServicePort(0), mUnRegisterMs(0) { } bool TargetSubscriberScheduler::Init(const Json::Value& scrapeConfig) { mScrapeConfigPtr = std::make_shared<ScrapeConfig>(); if (!mScrapeConfigPtr->Init(scrapeConfig)) { return false; } mJobName = mScrapeConfigPtr->mJobName; mInterval = prometheus::RefeshIntervalSeconds; return true; } bool TargetSubscriberScheduler::operator<(const TargetSubscriberScheduler& other) const { return mJobName < other.mJobName; } void TargetSubscriberScheduler::OnSubscription(HttpResponse& response, uint64_t timestampMilliSec) { mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SUBSCRIBE_TOTAL, response.GetStatusCode()); mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SUBSCRIBE_TIME_MS, response.GetStatusCode(), GetCurrentTimeInMilliSeconds() - timestampMilliSec); if (response.GetStatusCode() == 304) { // not modified return; } if (response.GetStatusCode() != 200) { return; } if (response.GetHeader().count(prometheus::ETAG)) { mETag = response.GetHeader().at(prometheus::ETAG); } const string& content = *response.GetBody<string>(); vector<PromTargetInfo> targetGroup; if (!ParseScrapeSchedulerGroup(content, targetGroup)) { return; } std::unordered_map<std::string, std::shared_ptr<ScrapeScheduler>> newScrapeSchedulerSet = BuildScrapeSchedulerSet(targetGroup); UpdateScrapeScheduler(newScrapeSchedulerSet); SET_GAUGE(mPromSubscriberTargets, mScrapeSchedulerMap.size()); ADD_COUNTER(mTotalDelayMs, GetCurrentTimeInMilliSeconds() - timestampMilliSec); } void TargetSubscriberScheduler::UpdateScrapeScheduler( std::unordered_map<std::string, std::shared_ptr<ScrapeScheduler>>& newScrapeSchedulerMap) { { WriteLock lock(mRWLock); vector<string> toRemove; // remove obsolete scrape work for (const auto& [k, v] : mScrapeSchedulerMap) { if (newScrapeSchedulerMap.find(k) == newScrapeSchedulerMap.end()) { toRemove.push_back(k); } } for (auto& k : toRemove) { mScrapeSchedulerMap[k]->Cancel(); mScrapeSchedulerMap.erase(k); } // save new scrape work auto added = 0; auto total = 0; for (const auto& [k, v] : newScrapeSchedulerMap) { if (mScrapeSchedulerMap.find(k) == mScrapeSchedulerMap.end()) { added++; mScrapeSchedulerMap[k] = v; auto tmpCurrentMilliSeconds = GetCurrentTimeInMilliSeconds(); auto tmpRandSleepMilliSec = GetRandSleepMilliSec(v->GetId(), v->GetScrapeIntervalSeconds(), tmpCurrentMilliSeconds); // zero-cost upgrade if ((mUnRegisterMs > 0 && (tmpCurrentMilliSeconds + tmpRandSleepMilliSec - v->GetScrapeIntervalSeconds() * 1000 > mUnRegisterMs) && (tmpCurrentMilliSeconds + tmpRandSleepMilliSec - v->GetScrapeIntervalSeconds() * 1000 * 2 < mUnRegisterMs)) || (v->GetReBalanceMs() > 0 && (tmpCurrentMilliSeconds + tmpRandSleepMilliSec - v->GetScrapeIntervalSeconds() * 1000 > v->GetReBalanceMs()) && (tmpCurrentMilliSeconds + tmpRandSleepMilliSec - v->GetScrapeIntervalSeconds() * 1000 * 2 < v->GetReBalanceMs()))) { // scrape once just now LOG_INFO(sLogger, ("scrape zero cost", ToString(tmpCurrentMilliSeconds))); v->SetScrapeOnceTime(chrono::steady_clock::now(), chrono::system_clock::now()); } v->ScheduleNext(); } } total = mScrapeSchedulerMap.size(); LOG_INFO(sLogger, ("prom job", mJobName)("targets removed", toRemove.size())("added", added)("total", total)); } } bool TargetSubscriberScheduler::ParseScrapeSchedulerGroup(const std::string& content, std::vector<PromTargetInfo>& scrapeSchedulerGroup) { string errs; Json::Value root; if (!ParseJsonTable(content, root, errs) || !root.isArray()) { LOG_ERROR(sLogger, ("http service discovery from operator failed", "Failed to parse JSON: " + errs)("job", mJobName)); return false; } for (const auto& element : root) { if (!element.isObject()) { LOG_ERROR( sLogger, ("http service discovery from operator failed", "Invalid target group item found")("job", mJobName)); return false; } // Parse targets vector<string> targets; if (element.isMember(prometheus::TARGETS) && element[prometheus::TARGETS].isArray()) { for (const auto& target : element[prometheus::TARGETS]) { if (target.isString()) { targets.push_back(target.asString()); } else { LOG_ERROR( sLogger, ("http service discovery from operator failed", "Invalid target item found")("job", mJobName)); return false; } } } if (targets.empty()) { continue; } PromTargetInfo targetInfo; // Parse labels https://www.robustperception.io/life-of-a-label/ Labels labels; if (element.isMember(prometheus::LABELS) && element[prometheus::LABELS].isObject()) { for (const string& labelKey : element[prometheus::LABELS].getMemberNames()) { labels.Set(labelKey, element[prometheus::LABELS][labelKey].asString()); } } std::ostringstream rawHashStream; rawHashStream << std::setw(16) << std::setfill('0') << std::hex << labels.Hash(); string rawAddress = labels.Get(prometheus::ADDRESS_LABEL_NAME); targetInfo.mHash = mScrapeConfigPtr->mJobName + rawAddress + rawHashStream.str(); targetInfo.mInstance = targets[0]; for (const auto& pair : mScrapeConfigPtr->mParams) { if (!pair.second.empty()) { labels.Set(prometheus::PARAM_LABEL_NAME + pair.first, pair.second[0]); } } if (element.isMember(prometheus::LABELS) && element[prometheus::LABELS].isObject()) { for (const string& labelKey : element[prometheus::LABELS].getMemberNames()) { labels.Set(labelKey, element[prometheus::LABELS][labelKey].asString()); } } if (labels.Get(prometheus::JOB).empty()) { labels.Set(prometheus::JOB, mJobName); } if (labels.Get(prometheus::SCHEME_LABEL_NAME).empty()) { labels.Set(prometheus::SCHEME_LABEL_NAME, mScrapeConfigPtr->mScheme); } if (labels.Get(prometheus::METRICS_PATH_LABEL_NAME).empty()) { labels.Set(prometheus::METRICS_PATH_LABEL_NAME, mScrapeConfigPtr->mMetricsPath); } if (labels.Get(prometheus::ADDRESS_LABEL_NAME).empty()) { continue; } targetInfo.mLabels = labels; scrapeSchedulerGroup.push_back(targetInfo); } return true; } std::unordered_map<std::string, std::shared_ptr<ScrapeScheduler>> TargetSubscriberScheduler::BuildScrapeSchedulerSet(std::vector<PromTargetInfo>& targetGroups) { std::unordered_map<std::string, std::shared_ptr<ScrapeScheduler>> scrapeSchedulerMap; for (auto& targetInfo : targetGroups) { // Relabel Config auto& resultLabel = targetInfo.mLabels; if (!mScrapeConfigPtr->mRelabelConfigs.Process(resultLabel)) { continue; } resultLabel.RemoveMetaLabels(); if (resultLabel.Size() == 0) { continue; } string address = resultLabel.Get(prometheus::ADDRESS_LABEL_NAME); if (resultLabel.Get(prometheus::INSTANCE).empty()) { resultLabel.Set(prometheus::INSTANCE, address); } auto m = address.find(':'); int32_t port = 0; if (m == string::npos) { // if no port, use default port if (resultLabel.Get(prometheus::SCHEME_LABEL_NAME) == prometheus::HTTP) { port = 80; } else if (resultLabel.Get(prometheus::SCHEME_LABEL_NAME) == prometheus::HTTPS) { port = 443; } else { continue; } } else { // parse port try { port = stoi(address.substr(m + 1)); } catch (...) { continue; } } string host = address.substr(0, m); string scheme = resultLabel.Get(prometheus::SCHEME_LABEL_NAME); if (scheme.empty()) { scheme = mScrapeConfigPtr->mScheme; } auto buildFullMetricsPath = [](Labels& labels, const string& rawMetricsPath) { string metricsPath = labels.Get(prometheus::METRICS_PATH_LABEL_NAME); if (metricsPath.empty()) { metricsPath = rawMetricsPath; } if (metricsPath[0] != '/') { metricsPath = "/" + metricsPath; } map<string, string> params; labels.Range([&params](const string& key, const string& value) { if (StartWith(key, prometheus::PARAM_LABEL_NAME)) { params[key.substr(strlen(prometheus::PARAM_LABEL_NAME))] = value; } }); string paramsStr; for (const auto& pair : params) { if (!paramsStr.empty()) { paramsStr += "&"; } paramsStr += pair.first + "=" + pair.second; } string optionalQuestion; if (!paramsStr.empty()) { optionalQuestion = "?"; if (metricsPath.find('?') != string::npos) { optionalQuestion = "&"; } } return metricsPath + optionalQuestion + paramsStr; }; auto metricsPath = buildFullMetricsPath(resultLabel, mScrapeConfigPtr->mMetricsPath); auto scrapeIntervalSeconds = DurationToSecond(resultLabel.Get(prometheus::SCRAPE_INTERVAL_LABEL_NAME)); if (scrapeIntervalSeconds == 0) { scrapeIntervalSeconds = mScrapeConfigPtr->mScrapeIntervalSeconds; } auto scrapeTimeoutSeconds = DurationToSecond(resultLabel.Get(prometheus::SCRAPE_TIMEOUT_LABEL_NAME)); if (scrapeTimeoutSeconds == 0) { scrapeTimeoutSeconds = mScrapeConfigPtr->mScrapeTimeoutSeconds; } if (scrapeIntervalSeconds == 0 || scrapeTimeoutSeconds == 0) { LOG_ERROR(sLogger, ("job", mJobName)("scrapeIntervalSeconds:", scrapeIntervalSeconds)("scrapeTimeoutSeconds:", scrapeTimeoutSeconds)); continue; } auto scrapeScheduler = std::make_shared<ScrapeScheduler>(mScrapeConfigPtr, host, port, scheme, metricsPath, scrapeIntervalSeconds, scrapeTimeoutSeconds, mQueueKey, mInputIndex, targetInfo); scrapeScheduler->SetComponent(mEventPool); auto randSleepMilliSec = GetRandSleepMilliSec(scrapeScheduler->GetId(), scrapeIntervalSeconds, GetCurrentTimeInMilliSeconds()); auto firstExecTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(randSleepMilliSec); auto firstScrapeTIme = std::chrono::system_clock::now() + std::chrono::milliseconds(randSleepMilliSec); scrapeScheduler->SetFirstExecTime(firstExecTime, firstScrapeTIme); scrapeScheduler->InitSelfMonitor(mDefaultLabels); scrapeSchedulerMap[scrapeScheduler->GetId()] = scrapeScheduler; } return scrapeSchedulerMap; } string TargetSubscriberScheduler::GetId() const { return mJobName; } void TargetSubscriberScheduler::ScheduleNext() { auto future = std::make_shared<PromFuture<HttpResponse&, uint64_t>>(); future->AddDoneCallback([this](HttpResponse& response, uint64_t timestampMilliSec) { this->OnSubscription(response, timestampMilliSec); this->ExecDone(); this->ScheduleNext(); return true; }); if (IsCancelled()) { mFuture->Cancel(); return; } { WriteLock lock(mLock); mFuture = future; } auto event = BuildSubscriberTimerEvent(GetNextExecTime()); Timer::GetInstance()->PushEvent(std::move(event)); } void TargetSubscriberScheduler::Cancel() { mFuture->Cancel(); { WriteLock lock(mLock); mValidState = false; } CancelAllScrapeScheduler(); } void TargetSubscriberScheduler::SubscribeOnce(std::chrono::steady_clock::time_point execTime) { auto future = std::make_shared<PromFuture<HttpResponse&, uint64_t>>(); future->AddDoneCallback([this](HttpResponse& response, uint64_t timestampNanoSec) { this->OnSubscription(response, timestampNanoSec); return true; }); mFuture = future; auto event = BuildSubscriberTimerEvent(execTime); Timer::GetInstance()->PushEvent(std::move(event)); } std::unique_ptr<TimerEvent> TargetSubscriberScheduler::BuildSubscriberTimerEvent(std::chrono::steady_clock::time_point execTime) { map<string, string> httpHeader; httpHeader[prometheus::ACCEPT] = prometheus::APPLICATION_JSON; httpHeader[prometheus::X_PROMETHEUS_REFRESH_INTERVAL_SECONDS] = ToString(prometheus::RefeshIntervalSeconds); httpHeader[prometheus::USER_AGENT] = prometheus::PROMETHEUS_PREFIX + mPodName; if (!mETag.empty()) { httpHeader[prometheus::IF_NONE_MATCH] = mETag; } auto body = TargetsInfoToString(); auto request = std::make_unique<PromHttpRequest>(HTTP_GET, false, mServiceHost, mServicePort, "/jobs/" + URLEncode(GetId()) + "/targets", "collector_id=" + mPodName, httpHeader, body, HttpResponse(), prometheus::RefeshIntervalSeconds, 1, this->mFuture); auto timerEvent = std::make_unique<HttpRequestTimerEvent>(execTime, std::move(request)); return timerEvent; } string TargetSubscriberScheduler::TargetsInfoToString() const { Json::Value root; SelfMonitorMetricEvent wantAgentEvent; LoongCollectorMonitor::GetInstance()->GetAgentMetric(wantAgentEvent); SelfMonitorMetricEvent wantRunnerEvent; LoongCollectorMonitor::GetInstance()->GetRunnerMetric(METRIC_LABEL_VALUE_RUNNER_NAME_HTTP_SINK, wantRunnerEvent); root[prometheus::AGENT_INFO][prometheus::CPU_USAGE] = wantAgentEvent.GetGauge(METRIC_AGENT_CPU); // double root[prometheus::AGENT_INFO][prometheus::CPU_LIMIT] = AppConfig::GetInstance()->GetCpuUsageUpLimit(); // float root[prometheus::AGENT_INFO][prometheus::MEM_USAGE] = wantAgentEvent.GetGauge(METRIC_AGENT_MEMORY); // double root[prometheus::AGENT_INFO][prometheus::MEM_LIMIT] = AppConfig::GetInstance()->GetMemUsageUpLimit(); // int64_t root[prometheus::AGENT_INFO][prometheus::HTTP_SINK_IN_ITEMS_TOTAL] = wantRunnerEvent.GetCounter(METRIC_RUNNER_IN_ITEMS_TOTAL); // uint64_t root[prometheus::AGENT_INFO][prometheus::HTTP_SINK_OUT_FAILED] = wantRunnerEvent.GetCounter(METRIC_RUNNER_SINK_OUT_FAILED_ITEMS_TOTAL); // uint64_t { ReadLock lock(mRWLock); for (const auto& [k, v] : mScrapeSchedulerMap) { Json::Value targetInfo; targetInfo[prometheus::HASH] = v->GetId(); targetInfo[prometheus::SIZE] = v->GetLastScrapeSize(); sDelaySeconds += v->mExecDelayCount; v->mExecDelayCount = 0; root[prometheus::TARGETS_INFO].append(targetInfo); } } auto curTime = std::chrono::steady_clock::now(); auto needToClear = curTime - mLastUpdateTime >= std::chrono::seconds(prometheus::RefeshIntervalSeconds); root[prometheus::AGENT_INFO][prometheus::SCRAPE_DELAY_SECONDS] = sDelaySeconds; if (needToClear) { sDelaySeconds = 0; mLastUpdateTime = curTime; } return root.toStyledString(); } void TargetSubscriberScheduler::CancelAllScrapeScheduler() { ReadLock lock(mRWLock); for (const auto& [k, v] : mScrapeSchedulerMap) { v->Cancel(); } } void TargetSubscriberScheduler::InitSelfMonitor(const MetricLabels& defaultLabels) { mDefaultLabels = defaultLabels; mDefaultLabels.emplace_back(METRIC_LABEL_KEY_JOB, mJobName); mDefaultLabels.emplace_back(METRIC_LABEL_KEY_POD_NAME, mPodName); mDefaultLabels.emplace_back(METRIC_LABEL_KEY_SERVICE_HOST, mServiceHost); mDefaultLabels.emplace_back(METRIC_LABEL_KEY_SERVICE_PORT, ToString(mServicePort)); static const std::unordered_map<std::string, MetricType> sSubscriberMetricKeys = { {METRIC_PLUGIN_PROM_SUBSCRIBE_TOTAL, MetricType::METRIC_TYPE_COUNTER}, {METRIC_PLUGIN_PROM_SUBSCRIBE_TIME_MS, MetricType::METRIC_TYPE_COUNTER}, }; mSelfMonitor = std::make_shared<PromSelfMonitorUnsafe>(); mSelfMonitor->InitMetricManager(sSubscriberMetricKeys, mDefaultLabels); WriteMetrics::GetInstance()->PrepareMetricsRecordRef( mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_PLUGIN_SOURCE, std::move(mDefaultLabels)); mPromSubscriberTargets = mMetricsRecordRef.CreateIntGauge(METRIC_PLUGIN_PROM_SUBSCRIBE_TARGETS); mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_PLUGIN_TOTAL_DELAY_MS); } } // namespace logtail