core/unittest/prometheus/ScrapeSchedulerUnittest.cpp (297 lines of code) (raw):

/* * Copyright 2024 iLogtail Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <unistd.h> #include <chrono> #include <memory> #include <string> #include "FileSystemUtil.h" #include "common/http/Curl.h" #include "common/http/HttpResponse.h" #include "common/timer/Timer.h" #include "models/RawEvent.h" #include "prometheus/Constants.h" #include "prometheus/async/PromFuture.h" #include "prometheus/component/StreamScraper.h" #include "prometheus/labels/Labels.h" #include "prometheus/schedulers/ScrapeConfig.h" #include "prometheus/schedulers/ScrapeScheduler.h" #include "unittest/Unittest.h" using namespace std; namespace logtail { class ScrapeSchedulerUnittest : public testing::Test { public: void TestInitscrapeScheduler(); void TestProcess(); void TestStreamMetricWriteCallback(); void TestReceiveMessage(); void TestScheduler(); void TestTokenUpdate(); void TestQueueIsFull(); void TestExactlyScrape(); protected: void SetUp() override { OverwriteFile(mFilePath, mKey); mScrapeConfig = make_shared<ScrapeConfig>(); mScrapeConfig->mJobName = "test_job"; mScrapeConfig->mScheme = "http"; mScrapeConfig->mScrapeIntervalSeconds = 10; mScrapeConfig->mScrapeTimeoutSeconds = 10; mScrapeConfig->mMetricsPath = "/metrics"; mScrapeConfig->mRequestHeaders = {{"Authorization", "Bearer xxxxx"}}; mScrapeConfig->mAuthType = "Bearer"; mScrapeConfig->mBearerTokenPath = "test_password.file"; } void TearDown() override { Timer::GetInstance()->Clear(); remove(mFilePath.c_str()); } private: std::shared_ptr<ScrapeConfig> mScrapeConfig; string mFilePath = "prom_password.file"; string mKey = "test_password.file"; }; void ScrapeSchedulerUnittest::TestInitscrapeScheduler() { Labels labels; labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); labels.Set("testb", "valueb"); labels.Set("testa", "localhost:8080"); PromTargetInfo targetInfo; targetInfo.mLabels = labels; targetInfo.mHash = "test_joblocalhost:8080887d0db7cce49fc7"; ScrapeScheduler event(mScrapeConfig, "localhost", 8080, "http", "/metrics", 15, 15, 0, 0, targetInfo); APSARA_TEST_EQUAL(event.GetId(), "test_joblocalhost:8080887d0db7cce49fc7"); } void ScrapeSchedulerUnittest::TestProcess() { EventPool eventPool{true}; Labels labels; labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); PromTargetInfo targetInfo; targetInfo.mLabels = labels; targetInfo.mHash = "test_hash"; ScrapeScheduler event(mScrapeConfig, "localhost", 8080, "http", "/metrics", 15, 15, 0, 0, targetInfo); auto streamScraper = prom::StreamScraper(labels, 0, 0, event.GetId(), nullptr, std::chrono::system_clock::now()); HttpResponse httpResponse = HttpResponse(&streamScraper, [](void*) {}, prom::StreamScraper::MetricWriteCallback); auto defaultLabels = MetricLabels(); event.InitSelfMonitor(defaultLabels); // APSARA_TEST_EQUAL(event.GetId(), "test_jobhttp://localhost:8080/metrics" + ToString(labels.Hash())); // if status code is not 200, no data will be processed // but will continue running, sending self-monitoring metrics httpResponse.SetStatusCode(503); httpResponse.SetNetworkStatus(NetworkCode::Ok, ""); event.OnMetricResult(httpResponse, 0); APSARA_TEST_EQUAL(1UL, streamScraper.mItem.size()); streamScraper.mItem.clear(); httpResponse.SetStatusCode(503); httpResponse.SetNetworkStatus(GetNetworkStatus(CURLE_COULDNT_CONNECT), ""); event.OnMetricResult(httpResponse, 0); APSARA_TEST_EQUAL( streamScraper.mItem[0]->mEventGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE).to_string(), "ERR_CONN_FAILED"); APSARA_TEST_EQUAL(1UL, streamScraper.mItem.size()); streamScraper.mItem.clear(); httpResponse.SetStatusCode(200); httpResponse.SetNetworkStatus(NetworkCode::Ok, ""); string body1 = "# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.\n" "# TYPE go_gc_duration_seconds summary\n" "go_gc_duration_seconds{quantile=\"0\"} 1.5531e-05\n" "go_gc_duration_seconds{quantile=\"0.25\"} 3.9357e-05\n" "go_gc_duration_seconds{quantile=\"0.5\"} 4.1114e-05\n" "go_gc_duration_seconds{quantile=\"0.75\"} 4.3372e-05\n" "go_gc_duration_seconds{quantile=\"1\"} 0.000112326\n" "go_gc_duration_seconds_sum 0.034885631\n" "go_gc_duration_seconds_count 850\n" "# HELP go_goroutines Number of goroutines that currently exist.\n" "# TYPE go_goroutines gauge\n" "go_goroutines 7\n" "# HELP go_info Information about the Go environment.\n" "# TYPE go_info gauge\n" "go_info{version=\"go1.22.3\"} 1\n" "# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.\n" "# TYPE go_memstats_alloc_bytes gauge\n" "go_memstats_alloc_bytes 6.742688e+06\n" "# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.\n" "# TYPE go_memstats_alloc_bytes_total counter\n" "go_memstats_alloc_bytes_total 1.5159292e+08"; prom::StreamScraper::MetricWriteCallback( body1.data(), (size_t)1, (size_t)body1.length(), (void*)httpResponse.GetBody<prom::StreamScraper>()); event.OnMetricResult(httpResponse, 0); APSARA_TEST_EQUAL(1UL, streamScraper.mItem.size()); APSARA_TEST_EQUAL(11UL, streamScraper.mItem[0]->mEventGroup.GetEvents().size()); } void ScrapeSchedulerUnittest::TestStreamMetricWriteCallback() { EventPool eventPool{true}; Labels labels; labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); PromTargetInfo targetInfo; targetInfo.mLabels = labels; targetInfo.mHash = "test_hash"; ScrapeScheduler event(mScrapeConfig, "localhost", 8080, "http", "/metrics", 15, 15, 0, 0, targetInfo); auto streamScraper = prom::StreamScraper(labels, 0, 0, event.GetId(), nullptr, std::chrono::system_clock::now()); HttpResponse httpResponse = HttpResponse(&streamScraper, [](void*) {}, prom::StreamScraper::MetricWriteCallback); // APSARA_TEST_EQUAL(event.GetId(), "test_jobhttp://localhost:8080/metrics" + ToString(labels.Hash())); string body1 = "# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.\n" "# TYPE go_gc_duration_seconds summary\n" "go_gc_duration_seconds{quantile=\"0\"} 1.5531e-05\n" "go_gc_duration_seconds{quantile=\"0.25\"} 3.9357e-05\n" "go_gc_duration_seconds{quantile=\"0.5\"} 4.1114e-05\n" "go_gc_duration_seconds{quantile=\"0.75\"} 4.3372e-05\n" "go_gc_duration_seconds{quantile=\"1\"} 0.000112326\n" "go_gc_duration_seconds_sum 0.034885631\n" "go_gc_duration_seconds_count 850\n" "# HELP go_goroutines Number of goroutines t" "hat currently exist.\n" "# TYPE go_goroutines gauge\n" "go_go"; string body2 = "routines 7\n" "# HELP go_info Information about the Go environment.\n" "# TYPE go_info gauge\n" "go_info{version=\"go1.22.3\"} 1\n" "# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.\n" "# TYPE go_memstats_alloc_bytes gauge\n" "go_memstats_alloc_bytes 6.742688e+06\n" "# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.\n" "# TYPE go_memstats_alloc_bytes_total counter\n" "go_memstats_alloc_bytes_total 1.5159292e+08"; prom::StreamScraper::MetricWriteCallback( body1.data(), (size_t)1, (size_t)body1.length(), (void*)httpResponse.GetBody<prom::StreamScraper>()); auto& res = httpResponse.GetBody<prom::StreamScraper>()->mEventGroup; APSARA_TEST_EQUAL(7UL, res.GetEvents().size()); APSARA_TEST_EQUAL("go_gc_duration_seconds{quantile=\"0\"} 1.5531e-05", res.GetEvents()[0].Cast<RawEvent>().GetContent()); APSARA_TEST_EQUAL("go_gc_duration_seconds{quantile=\"0.25\"} 3.9357e-05", res.GetEvents()[1].Cast<RawEvent>().GetContent()); APSARA_TEST_EQUAL("go_gc_duration_seconds{quantile=\"0.5\"} 4.1114e-05", res.GetEvents()[2].Cast<RawEvent>().GetContent()); APSARA_TEST_EQUAL("go_gc_duration_seconds{quantile=\"0.75\"} 4.3372e-05", res.GetEvents()[3].Cast<RawEvent>().GetContent()); APSARA_TEST_EQUAL("go_gc_duration_seconds{quantile=\"1\"} 0.000112326", res.GetEvents()[4].Cast<RawEvent>().GetContent()); APSARA_TEST_EQUAL("go_gc_duration_seconds_sum 0.034885631", res.GetEvents()[5].Cast<RawEvent>().GetContent()); APSARA_TEST_EQUAL("go_gc_duration_seconds_count 850", res.GetEvents()[6].Cast<RawEvent>().GetContent()); // httpResponse.GetBody<MetricResponseBody>()->mEventGroup = PipelineEventGroup(std::make_shared<SourceBuffer>()); prom::StreamScraper::MetricWriteCallback( body2.data(), (size_t)1, (size_t)body2.length(), (void*)httpResponse.GetBody<prom::StreamScraper>()); httpResponse.GetBody<prom::StreamScraper>()->FlushCache(); APSARA_TEST_EQUAL(11UL, res.GetEvents().size()); APSARA_TEST_EQUAL("go_goroutines 7", res.GetEvents()[7].Cast<RawEvent>().GetContent()); APSARA_TEST_EQUAL("go_info{version=\"go1.22.3\"} 1", res.GetEvents()[8].Cast<RawEvent>().GetContent()); APSARA_TEST_EQUAL("go_memstats_alloc_bytes 6.742688e+06", res.GetEvents()[9].Cast<RawEvent>().GetContent()); APSARA_TEST_EQUAL("go_memstats_alloc_bytes_total 1.5159292e+08", res.GetEvents()[10].Cast<RawEvent>().GetContent()); } void ScrapeSchedulerUnittest::TestReceiveMessage() { Labels labels; labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); PromTargetInfo targetInfo; targetInfo.mLabels = labels; targetInfo.mHash = "test_hash"; auto event = make_shared<ScrapeScheduler>(mScrapeConfig, "localhost", 8080, "http", "/metrics", 15, 15, 0, 0, targetInfo); // before APSARA_TEST_EQUAL(true, event->IsCancelled()); // after APSARA_TEST_EQUAL(false, event->IsCancelled()); } void ScrapeSchedulerUnittest::TestScheduler() { Labels labels; labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); PromTargetInfo targetInfo; targetInfo.mLabels = labels; targetInfo.mHash = "test_hash"; ScrapeScheduler event(mScrapeConfig, "localhost", 8080, "http", "/metrics", 15, 15, 0, 0, targetInfo); auto timer = make_shared<Timer>(); EventPool eventPool{true}; event.SetComponent(&eventPool); event.ScheduleNext(); APSARA_TEST_TRUE(Timer::GetInstance()->mQueue.size() == 1); event.Cancel(); APSARA_TEST_TRUE(event.mValidState == false); APSARA_TEST_TRUE(event.mFuture->mState == PromFutureState::Done); } void ScrapeSchedulerUnittest::TestTokenUpdate() { Labels labels; labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); PromTargetInfo targetInfo; targetInfo.mLabels = labels; targetInfo.mHash = "test_hash"; ScrapeScheduler event(mScrapeConfig, "localhost", 8080, "http", "/metrics", 15, 15, 0, 0, targetInfo); EventPool eventPool{true}; event.SetComponent(&eventPool); event.SetFirstExecTime(chrono::steady_clock::now(), chrono::system_clock::now()); event.ScheduleNext(); auto streamScraper = prom::StreamScraper(labels, 0, 0, event.GetId(), nullptr, std::chrono::system_clock::now()); HttpResponse httpResponse = HttpResponse(&streamScraper, [](void*) {}, prom::StreamScraper::MetricWriteCallback); auto defaultLabels = MetricLabels(); event.InitSelfMonitor(defaultLabels); httpResponse.SetStatusCode(401); httpResponse.SetNetworkStatus(NetworkCode::Other, ""); // success update event.mFuture->Process(httpResponse, 0); auto curTime = chrono::system_clock::now(); APSARA_TEST_TRUE(chrono::duration_cast<chrono::seconds>(curTime - mScrapeConfig->mLastUpdateTime) <= chrono::seconds(1)); // fail update, no change curTime = mScrapeConfig->mLastUpdateTime; this_thread::sleep_for(chrono::milliseconds(10)); event.mFuture->Process(httpResponse, 0); APSARA_TEST_EQUAL(curTime, mScrapeConfig->mLastUpdateTime); // success update mScrapeConfig->mLastUpdateTime = curTime - chrono::minutes(6); curTime = chrono::system_clock::now(); this_thread::sleep_for(chrono::milliseconds(10)); event.mFuture->Process(httpResponse, 0); APSARA_TEST_TRUE(chrono::duration_cast<chrono::seconds>(curTime - mScrapeConfig->mLastUpdateTime) <= chrono::seconds(1)); } void ScrapeSchedulerUnittest::TestQueueIsFull() { Labels labels; labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); PromTargetInfo targetInfo; targetInfo.mLabels = labels; targetInfo.mHash = "test_hash"; ScrapeScheduler event(mScrapeConfig, "localhost", 8080, "http", "/metrics", 15, 15, 0, 0, targetInfo); auto defaultLabels = MetricLabels(); event.InitSelfMonitor(defaultLabels); EventPool eventPool{true}; event.SetComponent(&eventPool); auto now = std::chrono::steady_clock::now(); auto nowScrape = std::chrono::system_clock::now(); event.SetFirstExecTime(now, nowScrape); event.ScheduleNext(); APSARA_TEST_TRUE(Timer::GetInstance()->mQueue.size() == 1); const auto& e = Timer::GetInstance()->mQueue.top(); APSARA_TEST_EQUAL(now, e->GetExecTime()); APSARA_TEST_FALSE(e->IsValid()); Timer::GetInstance()->mQueue.pop(); // queue is full, so it should schedule next after 1 second APSARA_TEST_EQUAL(1UL, Timer::GetInstance()->mQueue.size()); const auto& next = Timer::GetInstance()->mQueue.top(); APSARA_TEST_EQUAL(now + std::chrono::seconds(1), next->GetExecTime()); } void ScrapeSchedulerUnittest::TestExactlyScrape() { Labels labels; labels.Set(prometheus::ADDRESS_LABEL_NAME, "localhost:8080"); PromTargetInfo targetInfo; targetInfo.mLabels = labels; targetInfo.mHash = "test_hash"; ScrapeScheduler event(mScrapeConfig, "localhost", 8080, "http", "/metrics", 10, 10, 0, 0, targetInfo); auto defaultLabels = MetricLabels(); event.InitSelfMonitor(defaultLabels); EventPool eventPool{true}; event.SetComponent(&eventPool); auto execTime = std::chrono::steady_clock::now(); auto scrapeTime = std::chrono::system_clock::now(); event.SetFirstExecTime(execTime, scrapeTime); auto firstScrapeTime = event.mLatestScrapeTime; event.ExecDone(); auto secondScrapeTime = event.mLatestScrapeTime; event.ExecDone(); event.DelayExecTime(1); auto thirdScrapeTime = event.mLatestScrapeTime; event.ExecDone(); auto fourthScrapeTime = event.mLatestScrapeTime; APSARA_TEST_EQUAL(firstScrapeTime, scrapeTime); APSARA_TEST_EQUAL(secondScrapeTime - firstScrapeTime, std::chrono::seconds(mScrapeConfig->mScrapeIntervalSeconds)); APSARA_TEST_EQUAL(thirdScrapeTime - firstScrapeTime, std::chrono::seconds(mScrapeConfig->mScrapeIntervalSeconds * 2 + 1)); APSARA_TEST_EQUAL(fourthScrapeTime - firstScrapeTime, std::chrono::seconds(mScrapeConfig->mScrapeIntervalSeconds * 3)); } UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestInitscrapeScheduler) UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestProcess) UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestStreamMetricWriteCallback) UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestScheduler) UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestQueueIsFull) UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestExactlyScrape) UNIT_TEST_CASE(ScrapeSchedulerUnittest, TestTokenUpdate) } // namespace logtail UNIT_TEST_MAIN