core/prometheus/schedulers/ScrapeScheduler.cpp (194 lines of code) (raw):
/*
* Copyright 2024 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "prometheus/schedulers/ScrapeScheduler.h"
#include <cstddef>
#include <chrono>
#include <memory>
#include <string>
#include <utility>
#include "collection_pipeline/queue/ProcessQueueManager.h"
#include "collection_pipeline/queue/QueueKey.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "common/http/Constant.h"
#include "common/timer/HttpRequestTimerEvent.h"
#include "logger/Logger.h"
#include "prometheus/Constants.h"
#include "prometheus/Utils.h"
#include "prometheus/async/PromFuture.h"
#include "prometheus/async/PromHttpRequest.h"
#include "prometheus/component/StreamScraper.h"
using namespace std;
namespace logtail {
ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
string host,
int32_t port,
std::string scheme,
std::string metricsPath,
uint64_t scrapeIntervalSeconds,
uint64_t scrapeTimeoutSeconds,
QueueKey queueKey,
size_t inputIndex,
const PromTargetInfo& targetInfo)
: mScrapeConfigPtr(std::move(scrapeConfigPtr)),
mHost(std::move(host)),
mPort(port),
mTargetInfo(targetInfo),
mMetricsPath(std::move(metricsPath)),
mScheme(std::move(scheme)),
mScrapeTimeoutSeconds(scrapeTimeoutSeconds),
mQueueKey(queueKey),
mInputIndex(inputIndex),
mScrapeResponseSizeBytes(-1) {
mInterval = scrapeIntervalSeconds;
}
void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t) {
static double sRate = 0.001;
auto now = GetCurrentTimeInMilliSeconds();
auto scrapeTimestampMilliSec
= chrono::duration_cast<chrono::milliseconds>(mLatestScrapeTime.time_since_epoch()).count();
auto scrapeDurationMilliSeconds = now - scrapeTimestampMilliSec;
auto* streamScraper = response.GetBody<prom::StreamScraper>();
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_EVENTS_TOTAL, response.GetStatusCode());
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_SIZE_BYTES, response.GetStatusCode(), streamScraper->mRawSize);
mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SCRAPE_TIME_MS, response.GetStatusCode(), scrapeDurationMilliSeconds);
const auto& networkStatus = response.GetNetworkStatus();
string scrapeState;
auto scrapeDurationSeconds = scrapeDurationMilliSeconds * sRate;
auto upState = false;
if (networkStatus.mCode != NetworkCode::Ok) {
// not 0 means curl error
scrapeState = prom::NetworkCodeToState(networkStatus.mCode);
} else if (response.GetStatusCode() != 200) {
scrapeState = prom::HttpCodeToState(response.GetStatusCode());
} else {
// 0 means success
scrapeState = prom::NetworkCodeToState(NetworkCode::Ok);
upState = true;
}
if (response.GetStatusCode() != 200) {
LOG_WARNING(sLogger,
("scrape failed, status code", response.GetStatusCode())("target", mTargetInfo.mHash)(
"curl msg", response.GetNetworkStatus().mMessage));
}
streamScraper->mStreamIndex++;
if (upState) {
streamScraper->FlushCache();
}
streamScraper->SetAutoMetricMeta(scrapeDurationSeconds, upState, scrapeState);
streamScraper->SendMetrics();
mScrapeResponseSizeBytes = streamScraper->mRawSize;
streamScraper->Reset();
ADD_COUNTER(mPluginTotalDelayMs, scrapeDurationMilliSeconds);
}
string ScrapeScheduler::GetId() const {
return mTargetInfo.mHash;
}
uint64_t ScrapeScheduler::GetScrapeIntervalSeconds() const {
return mInterval;
}
void ScrapeScheduler::SetComponent(EventPool* eventPool) {
mEventPool = eventPool;
}
void ScrapeScheduler::ScheduleNext() {
auto future = std::make_shared<PromFuture<HttpResponse&, uint64_t>>();
auto isContextValidFuture = std::make_shared<PromFuture<>>();
future->AddDoneCallback([this](HttpResponse& response, uint64_t timestampMilliSec) {
if (response.GetStatusCode() == 401) {
auto duration
= chrono::duration_cast<chrono::seconds>(mLatestScrapeTime - mScrapeConfigPtr->mLastUpdateTime).count();
if ((duration <= mInterval && duration > 0) || mScrapeConfigPtr->UpdateAuthorization()) {
LOG_WARNING(sLogger, ("retry", GetId()));
this->ScheduleNext();
return true;
}
}
this->OnMetricResult(response, timestampMilliSec);
this->ExecDone();
this->ScheduleNext();
return true;
});
isContextValidFuture->AddDoneCallback([this]() -> bool {
if (ProcessQueueManager::GetInstance()->IsValidToPush(mQueueKey)) {
return true;
}
this->DelayExecTime(1);
this->mExecDelayCount++;
ADD_COUNTER(this->mPromDelayTotal, 1);
this->ScheduleNext();
return false;
});
if (IsCancelled()) {
mFuture->Cancel();
mIsContextValidFuture->Cancel();
return;
}
{
WriteLock lock(mLock);
mFuture = future;
mIsContextValidFuture = isContextValidFuture;
}
auto event = BuildScrapeTimerEvent(GetNextExecTime());
Timer::GetInstance()->PushEvent(std::move(event));
}
void ScrapeScheduler::ScrapeOnce(std::chrono::steady_clock::time_point execTime) {
auto future = std::make_shared<PromFuture<HttpResponse&, uint64_t>>();
future->AddDoneCallback([this](HttpResponse& response, uint64_t timestampMilliSec) {
this->OnMetricResult(response, timestampMilliSec);
return true;
});
mFuture = future;
auto event = BuildScrapeTimerEvent(execTime);
Timer::GetInstance()->PushEvent(std::move(event));
}
std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::steady_clock::time_point execTime) {
auto retry = mInterval / mScrapeTimeoutSeconds;
if (retry > 0) {
retry -= 1;
}
auto request = std::make_unique<PromHttpRequest>(
HTTP_GET,
mScheme == prometheus::HTTPS,
mHost,
mPort,
mMetricsPath,
"",
mScrapeConfigPtr->mRequestHeaders,
"",
HttpResponse(
new prom::StreamScraper(
mTargetInfo.mLabels, mQueueKey, mInputIndex, mTargetInfo.mHash, mEventPool, mLatestScrapeTime),
[](void* p) { delete static_cast<prom::StreamScraper*>(p); },
prom::StreamScraper::MetricWriteCallback),
mScrapeTimeoutSeconds,
retry,
this->mFuture,
this->mIsContextValidFuture,
mScrapeConfigPtr->mFollowRedirects,
mScrapeConfigPtr->mEnableTLS ? std::optional<CurlTLS>(mScrapeConfigPtr->mTLS) : std::nullopt);
auto timerEvent = std::make_unique<HttpRequestTimerEvent>(execTime, std::move(request));
return timerEvent;
}
void ScrapeScheduler::Cancel() {
if (mFuture != nullptr) {
mFuture->Cancel();
}
if (mIsContextValidFuture != nullptr) {
mIsContextValidFuture->Cancel();
}
{
WriteLock lock(mLock);
mValidState = false;
}
}
void ScrapeScheduler::InitSelfMonitor(const MetricLabels& defaultLabels) {
mSelfMonitor = std::make_shared<PromSelfMonitorUnsafe>();
MetricLabels labels = defaultLabels;
labels.emplace_back(METRIC_LABEL_KEY_INSTANCE, mTargetInfo.mInstance);
static const std::unordered_map<std::string, MetricType> sScrapeMetricKeys
= {{METRIC_PLUGIN_OUT_EVENTS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_PLUGIN_OUT_SIZE_BYTES, MetricType::METRIC_TYPE_COUNTER},
{METRIC_PLUGIN_PROM_SCRAPE_TIME_MS, MetricType::METRIC_TYPE_COUNTER}};
mSelfMonitor->InitMetricManager(sScrapeMetricKeys, labels);
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_PLUGIN_SOURCE, std::move(labels));
mPromDelayTotal = mMetricsRecordRef.CreateCounter(METRIC_PLUGIN_PROM_SCRAPE_DELAY_TOTAL);
mPluginTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_PLUGIN_TOTAL_DELAY_MS);
}
} // namespace logtail