core/prometheus/PrometheusInputRunner.cpp (232 lines of code) (raw):
/*
* Copyright 2024 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PrometheusInputRunner.h"
#include <chrono>
#include <memory>
#include <string>
#include "application/Application.h"
#include "common/Flags.h"
#include "common/JsonUtil.h"
#include "common/StringTools.h"
#include "common/http/AsynCurlRunner.h"
#include "common/http/Constant.h"
#include "common/http/Curl.h"
#include "common/timer/Timer.h"
#include "logger/Logger.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "prometheus/Constants.h"
#include "prometheus/Utils.h"
using namespace std;
DECLARE_FLAG_STRING(operator_service);
DECLARE_FLAG_INT32(operator_service_port);
DECLARE_FLAG_STRING(_pod_name_);
namespace logtail {
PrometheusInputRunner::PrometheusInputRunner()
: mServiceHost(STRING_FLAG(operator_service)),
mServicePort(INT32_FLAG(operator_service_port)),
mPodName(STRING_FLAG(_pod_name_)),
mEventPool(true),
mUnRegisterMs(0) {
// self monitor
MetricLabels labels;
labels.emplace_back(METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_PROMETHEUS);
labels.emplace_back(METRIC_LABEL_KEY_INSTANCE_ID, Application::GetInstance()->GetInstanceId());
labels.emplace_back(METRIC_LABEL_KEY_POD_NAME, mPodName);
labels.emplace_back(METRIC_LABEL_KEY_SERVICE_HOST, mServiceHost);
labels.emplace_back(METRIC_LABEL_KEY_SERVICE_PORT, ToString(mServicePort));
DynamicMetricLabels dynamicLabels;
dynamicLabels.emplace_back(METRIC_LABEL_KEY_PROJECT, [this]() -> std::string { return this->GetAllProjects(); });
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_RUNNER, std::move(labels), std::move(dynamicLabels));
mPromRegisterState = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_CLIENT_REGISTER_STATE);
mPromJobNum = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_JOBS_TOTAL);
mPromRegisterRetryTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_CLIENT_REGISTER_RETRY_TOTAL);
}
/// @brief receive scrape jobs from input plugins and update scrape jobs
void PrometheusInputRunner::UpdateScrapeInput(std::shared_ptr<TargetSubscriberScheduler> targetSubscriber,
const MetricLabels& defaultLabels,
const string& projectName) {
RemoveScrapeInput(targetSubscriber->GetId());
targetSubscriber->mServiceHost = mServiceHost;
targetSubscriber->mServicePort = mServicePort;
targetSubscriber->mPodName = mPodName;
targetSubscriber->InitSelfMonitor(defaultLabels);
targetSubscriber->mUnRegisterMs = mUnRegisterMs.load();
targetSubscriber->SetComponent(&mEventPool);
auto currSystemTime = chrono::system_clock::now();
auto randSleepMilliSec
= GetRandSleepMilliSec(targetSubscriber->GetId(),
prometheus::RefeshIntervalSeconds,
chrono::duration_cast<chrono::milliseconds>(currSystemTime.time_since_epoch()).count());
auto firstExecTime = chrono::steady_clock::now() + chrono::milliseconds(randSleepMilliSec);
auto firstSubscribeTime = currSystemTime + chrono::milliseconds(randSleepMilliSec);
targetSubscriber->SetFirstExecTime(firstExecTime, firstSubscribeTime);
// 1. add subscriber to mTargetSubscriberSchedulerMap
{
WriteLock lock(mSubscriberMapRWLock);
mTargetSubscriberSchedulerMap[targetSubscriber->GetId()] = targetSubscriber;
}
// 2. build Ticker Event and add it to Timer
targetSubscriber->ScheduleNext();
{
ReadLock lock(mSubscriberMapRWLock);
SET_GAUGE(mPromJobNum, mTargetSubscriberSchedulerMap.size());
}
// 3. add project name to mJobNameToProjectNameMap for self monitor
{
WriteLock lock(mProjectRWLock);
mJobNameToProjectNameMap[targetSubscriber->GetId()] = projectName;
}
}
void PrometheusInputRunner::RemoveScrapeInput(const std::string& jobName) {
{
WriteLock lock(mSubscriberMapRWLock);
if (mTargetSubscriberSchedulerMap.count(jobName)) {
mTargetSubscriberSchedulerMap[jobName]->Cancel();
mTargetSubscriberSchedulerMap.erase(jobName);
SET_GAUGE(mPromJobNum, mTargetSubscriberSchedulerMap.size());
}
}
{
WriteLock lock(mProjectRWLock);
if (mJobNameToProjectNameMap.count(jobName)) {
mJobNameToProjectNameMap.erase(jobName);
}
}
}
/// @brief targets discovery and start scrape work
void PrometheusInputRunner::Init() {
std::lock_guard<mutex> lock(mStartMutex);
if (mIsStarted) {
return;
}
LOG_INFO(sLogger, ("PrometheusInputRunner", "Start"));
mIsStarted = true;
#ifndef APSARA_UNIT_TEST_MAIN
Timer::GetInstance()->Init();
AsynCurlRunner::GetInstance()->Init();
#endif
LOG_INFO(sLogger, ("PrometheusInputRunner", "register"));
// only register when operator exist
if (!mServiceHost.empty()) {
mIsThreadRunning.store(true);
mThreadRes = std::async(launch::async, [this]() {
std::lock_guard<mutex> lock(mRegisterMutex);
int retry = 0;
while (mIsThreadRunning.load()) {
++retry;
auto httpResponse = SendRegisterMessage(prometheus::REGISTER_COLLECTOR_PATH);
if (httpResponse.GetStatusCode() != 200) {
ADD_COUNTER(mPromRegisterRetryTotal, 1);
if (retry % 10 == 0) {
LOG_INFO(sLogger,
("register failed, retried", retry)("statusCode", httpResponse.GetStatusCode()));
}
} else {
// register success
// response will be { "unRegisterMs": 30000 }
if (!httpResponse.GetBody<string>()->empty()) {
string responseStr = *httpResponse.GetBody<string>();
string errMsg;
Json::Value responseJson;
if (!ParseJsonTable(responseStr, responseJson, errMsg)) {
LOG_ERROR(sLogger, ("register failed, parse response failed", responseStr));
}
if (responseJson.isMember(prometheus::UNREGISTER_MS)
&& responseJson[prometheus::UNREGISTER_MS].isString()) {
auto tmpStr = responseJson[prometheus::UNREGISTER_MS].asString();
if (tmpStr.empty()) {
mUnRegisterMs = 0;
} else {
uint64_t unRegisterMs{};
StringTo(tmpStr, unRegisterMs);
mUnRegisterMs.store(unRegisterMs);
// adjust unRegisterMs to scrape targets for zero-cost
mUnRegisterMs -= 1000;
LOG_INFO(sLogger, ("unRegisterMs", ToString(mUnRegisterMs)));
}
}
}
SET_GAUGE(mPromRegisterState, 1);
LOG_INFO(sLogger, ("Register Success", mPodName));
// subscribe immediately
SubscribeOnce();
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
}
}
/// @brief stop scrape work and clear all scrape jobs
void PrometheusInputRunner::Stop() {
std::lock_guard<mutex> lock(mStartMutex);
if (!mIsStarted) {
return;
}
mIsStarted = false;
mIsThreadRunning.store(false);
if (mThreadRes.valid()) {
mThreadRes.wait_for(chrono::seconds(1));
}
LOG_INFO(sLogger, ("PrometheusInputRunner", "cancel all target subscribers"));
CancelAllTargetSubscriber();
{
WriteLock lock(mSubscriberMapRWLock);
mTargetSubscriberSchedulerMap.clear();
}
// only unregister when operator exist
if (!mServiceHost.empty()) {
LOG_INFO(sLogger, ("PrometheusInputRunner", "unregister"));
auto res = std::async(launch::async, [this]() {
std::lock_guard<mutex> lock(mRegisterMutex);
for (int retry = 0; retry < 3; ++retry) {
auto httpResponse = SendRegisterMessage(prometheus::UNREGISTER_COLLECTOR_PATH);
if (httpResponse.GetStatusCode() != 200) {
LOG_ERROR(sLogger, ("unregister failed, statusCode", httpResponse.GetStatusCode()));
} else {
LOG_INFO(sLogger, ("Unregister Success", mPodName));
SET_GAUGE(mPromRegisterState, 0);
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
}
LOG_INFO(sLogger, ("PrometheusInputRunner", "Stop"));
}
bool PrometheusInputRunner::HasRegisteredPlugins() const {
ReadLock lock(mSubscriberMapRWLock);
return !mTargetSubscriberSchedulerMap.empty();
}
void PrometheusInputRunner::EventGC() {
mEventPool.CheckGC();
}
HttpResponse PrometheusInputRunner::SendRegisterMessage(const string& url) const {
HttpResponse httpResponse;
#ifdef APSARA_UNIT_TEST_MAIN
httpResponse.SetStatusCode(200);
return httpResponse;
#endif
map<string, string> httpHeader;
if (!SendHttpRequest(
make_unique<HttpRequest>(
HTTP_GET, false, mServiceHost, mServicePort, url, "pod_name=" + mPodName, httpHeader, "", 10),
httpResponse)) {
LOG_ERROR(sLogger, ("curl error", "")("url", url)("pod_name", mPodName));
}
return httpResponse;
}
void PrometheusInputRunner::CancelAllTargetSubscriber() {
ReadLock lock(mSubscriberMapRWLock);
for (auto& it : mTargetSubscriberSchedulerMap) {
it.second->Cancel();
}
}
void PrometheusInputRunner::SubscribeOnce() {
ReadLock lock(mSubscriberMapRWLock);
for (auto& [k, v] : mTargetSubscriberSchedulerMap) {
v->SubscribeOnce(std::chrono::steady_clock::now());
}
}
string PrometheusInputRunner::GetAllProjects() {
string result;
set<string> existProjects;
ReadLock lock(mProjectRWLock);
for (auto& [k, v] : mJobNameToProjectNameMap) {
if (existProjects.find(v) == existProjects.end()) {
if (!result.empty()) {
result += " ";
}
existProjects.insert(v);
result += v;
}
}
return result;
}
}; // namespace logtail