core/runner/FlusherRunner.cpp (174 lines of code) (raw):
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "runner/FlusherRunner.h"
#include "app_config/AppConfig.h"
#include "application/Application.h"
#include "collection_pipeline/plugin/interface/HttpFlusher.h"
#include "collection_pipeline/queue/QueueKeyManager.h"
#include "collection_pipeline/queue/SenderQueueItem.h"
#include "collection_pipeline/queue/SenderQueueManager.h"
#include "common/LogtailCommonFlags.h"
#include "common/StringTools.h"
#include "common/http/HttpRequest.h"
#include "logger/Logger.h"
#include "monitor/AlarmManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "runner/sink/http/HttpSink.h"
DEFINE_FLAG_INT32(flusher_runner_exit_timeout_sec, "", 60);
DECLARE_FLAG_INT32(discard_send_fail_interval);
using namespace std;
namespace logtail {
bool FlusherRunner::Init() {
LoadModuleConfig(true);
mCallback = [this]() { return LoadModuleConfig(false); };
AppConfig::GetInstance()->RegisterCallback("max_bytes_per_sec", &mCallback);
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef,
MetricCategory::METRIC_CATEGORY_RUNNER,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FLUSHER}});
mInItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_ITEMS_TOTAL);
mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_SIZE_BYTES);
mOutItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_OUT_ITEMS_TOTAL);
mTotalDelayMs = mMetricsRecordRef.CreateTimeCounter(METRIC_RUNNER_TOTAL_DELAY_MS);
mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
mInItemRawDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_FLUSHER_IN_RAW_SIZE_BYTES);
mWaitingItemsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_FLUSHER_WAITING_ITEMS_TOTAL);
mThreadRes = async(launch::async, &FlusherRunner::Run, this);
mLastCheckSendClientTime = time(nullptr);
mIsFlush = false;
return true;
}
bool FlusherRunner::LoadModuleConfig(bool isInit) {
auto ValidateFn = [](const std::string& key, const int32_t value) -> bool {
if (key == "max_bytes_per_sec") {
if (value < (int32_t)(1024 * 1024)) {
return false;
}
return true;
}
return true;
};
if (isInit) {
// Only handle parameters that do not allow hot loading
}
auto maxBytePerSec = AppConfig::GetInstance()->MergeInt32(
kDefaultMaxSendBytePerSec, AppConfig::GetInstance()->GetMaxBytePerSec(), "max_bytes_per_sec", ValidateFn);
AppConfig::GetInstance()->SetMaxBytePerSec(maxBytePerSec);
UpdateSendFlowControl();
return true;
}
void FlusherRunner::UpdateSendFlowControl() {
// when inflow exceed 30MB/s, FlowControl lose precision
if (AppConfig::GetInstance()->GetMaxBytePerSec() >= 30 * 1024 * 1024) {
mEnableRateLimiter = false;
}
LOG_INFO(sLogger,
("send byte per second limit", AppConfig::GetInstance()->GetMaxBytePerSec())(
"send flow control", mEnableRateLimiter ? "enable" : "disable"));
}
void FlusherRunner::Stop() {
mIsFlush = true;
SenderQueueManager::GetInstance()->Trigger();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(flusher_runner_exit_timeout_sec)));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("flusher runner", "stopped successfully"));
} else {
LOG_WARNING(sLogger, ("flusher runner", "forced to stopped"));
}
}
void FlusherRunner::DecreaseHttpSendingCnt() {
--mHttpSendingCnt;
SenderQueueManager::GetInstance()->Trigger();
}
void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) {
// TODO: use semaphore instead
while (withLimit && !Application::GetInstance()->IsExiting()
&& GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestGlobalConcurrency()) {
this_thread::sleep_for(chrono::milliseconds(10));
}
unique_ptr<HttpSinkRequest> req;
bool keepItem = false;
string errMsg;
if (!static_cast<HttpFlusher*>(item->mFlusher)->BuildRequest(item, req, &keepItem, &errMsg)) {
if (keepItem
&& chrono::duration_cast<chrono::seconds>(chrono::system_clock::now() - item->mFirstEnqueTime).count()
< INT32_FLAG(discard_send_fail_interval)) {
item->mStatus = SendingStatus::IDLE;
LOG_TRACE(sLogger,
("failed to build request", "retry later")("item address", item)(
"config-flusher-dst", QueueKeyManager::GetInstance()->GetName(item->mQueueKey)));
SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey);
} else {
LOG_WARNING(sLogger,
("failed to build request", "discard item")("item address", item)(
"config-flusher-dst", QueueKeyManager::GetInstance()->GetName(item->mQueueKey)));
SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey);
SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
}
return;
}
req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now();
LOG_TRACE(sLogger,
("send item to http sink, item address", item)("config-flusher-dst",
QueueKeyManager::GetInstance()->GetName(item->mQueueKey))(
"sending cnt", ToString(mHttpSendingCnt.load() + 1)));
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
}
void FlusherRunner::Run() {
LOG_INFO(sLogger, ("flusher runner", "started"));
while (true) {
auto curTime = chrono::system_clock::now();
SET_GAUGE(mLastRunTime, chrono::duration_cast<chrono::seconds>(curTime.time_since_epoch()).count());
vector<SenderQueueItem*> items;
int32_t limit = Application::GetInstance()->IsExiting()
? -1
: AppConfig::GetInstance()->GetSendRequestGlobalConcurrency();
SenderQueueManager::GetInstance()->GetAvailableItems(items, limit);
if (items.empty()) {
SenderQueueManager::GetInstance()->Wait(1000);
} else {
LOG_TRACE(sLogger, ("got items from sender queue, cnt", items.size()));
for (auto itr = items.begin(); itr != items.end(); ++itr) {
ADD_COUNTER(mInItemDataSizeBytes, (*itr)->mData.size());
ADD_COUNTER(mInItemRawDataSizeBytes, (*itr)->mRawSize);
}
ADD_COUNTER(mInItemsTotal, items.size());
ADD_GAUGE(mWaitingItemsTotal, items.size());
}
for (auto itr = items.begin(); itr != items.end(); ++itr) {
LOG_TRACE(
sLogger,
("got item from sender queue, item address",
*itr)("config-flusher-dst", QueueKeyManager::GetInstance()->GetName((*itr)->mQueueKey))(
"wait time",
ToString(chrono::duration_cast<chrono::milliseconds>(curTime - (*itr)->mFirstEnqueTime).count())
+ "ms")("try cnt", ToString((*itr)->mTryCnt)));
// TODO: use rate limiter instead
if (!Application::GetInstance()->IsExiting() && mEnableRateLimiter) {
RateLimiter::FlowControl((*itr)->mRawSize, mSendLastTime, mSendLastByte, true);
}
Dispatch(*itr);
SUB_GAUGE(mWaitingItemsTotal, 1);
ADD_COUNTER(mOutItemsTotal, 1);
ADD_COUNTER(mTotalDelayMs, chrono::system_clock::now() - curTime);
}
if (mIsFlush && SenderQueueManager::GetInstance()->IsAllQueueEmpty()) {
break;
}
}
}
void FlusherRunner::Dispatch(SenderQueueItem* item) {
switch (item->mFlusher->GetSinkType()) {
case SinkType::HTTP:
// TODO: make it common for all http flushers
if (!BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting()
&& item->mFlusher->Name() == "flusher_sls") {
DiskBufferWriter::GetInstance()->PushToDiskBuffer(item, 3);
SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
} else {
PushToHttpSink(item);
}
break;
default:
SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
break;
}
}
} // namespace logtail