core/runner/ProcessorRunner.cpp (195 lines of code) (raw):
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "runner/ProcessorRunner.h"
#include "app_config/AppConfig.h"
#include "batch/TimeoutFlushManager.h"
#include "collection_pipeline/CollectionPipelineManager.h"
#include "common/Flags.h"
#include "go_pipeline/LogtailPlugin.h"
#include "models/EventPool.h"
#include "monitor/AlarmManager.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "queue/ProcessQueueManager.h"
#include "queue/QueueKeyManager.h"
DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1);
DEFINE_FLAG_INT32(processor_runner_exit_timeout_sec, "", 60);
DECLARE_FLAG_INT32(max_send_log_group_size);
using namespace std;
namespace logtail {
thread_local uint32_t ProcessorRunner::sThreadNo;
thread_local MetricsRecordRef ProcessorRunner::sMetricsRecordRef;
thread_local CounterPtr ProcessorRunner::sInGroupsCnt;
thread_local CounterPtr ProcessorRunner::sInEventsCnt;
thread_local CounterPtr ProcessorRunner::sInGroupDataSizeBytes;
thread_local IntGaugePtr ProcessorRunner::sLastRunTime;
ProcessorRunner::ProcessorRunner()
: mThreadCount(AppConfig::GetInstance()->GetProcessThreadCount()), mThreadRes(mThreadCount) {
}
void ProcessorRunner::Init() {
for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) {
mThreadRes[threadNo] = async(launch::async, &ProcessorRunner::Run, this, threadNo);
}
mIsFlush = false;
}
void ProcessorRunner::Stop() {
mIsFlush = true;
ProcessQueueManager::GetInstance()->Trigger();
for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) {
if (!mThreadRes[threadNo].valid()) {
continue;
}
future_status s = mThreadRes[threadNo].wait_for(chrono::seconds(INT32_FLAG(processor_runner_exit_timeout_sec)));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("processor runner", "stopped successfully")("threadNo", threadNo));
} else {
LOG_WARNING(sLogger, ("processor runner", "forced to stopped")("threadNo", threadNo));
}
}
}
bool ProcessorRunner::PushQueue(QueueKey key, size_t inputIndex, PipelineEventGroup&& group, uint32_t retryTimes) {
unique_ptr<ProcessQueueItem> item = make_unique<ProcessQueueItem>(std::move(group), inputIndex);
for (size_t i = 0; i < retryTimes; ++i) {
if (ProcessQueueManager::GetInstance()->PushQueue(key, std::move(item)) == QueueStatus::OK) {
return true;
}
if (i % 100 == 0) {
LOG_WARNING(sLogger,
("push attempts to process queue continuously failed for the past second",
"retry again")("config", QueueKeyManager::GetInstance()->GetName(key))("input index",
ToString(inputIndex)));
}
this_thread::sleep_for(chrono::milliseconds(10));
}
group = std::move(item->mEventGroup);
return false;
}
void ProcessorRunner::Run(uint32_t threadNo) {
LOG_INFO(sLogger, ("processor runner", "started")("thread no", threadNo));
// thread local metrics should be initialized in each thread
sThreadNo = threadNo;
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
sMetricsRecordRef,
MetricCategory::METRIC_CATEGORY_RUNNER,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_PROCESSOR},
{METRIC_LABEL_KEY_THREAD_NO, ToString(threadNo)}});
sInGroupsCnt = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENT_GROUPS_TOTAL);
sInEventsCnt = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENTS_TOTAL);
sInGroupDataSizeBytes = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_SIZE_BYTES);
sLastRunTime = sMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
static int32_t lastFlushBatchTime = 0;
while (true) {
int32_t curTime = time(nullptr);
if (threadNo == 0 && curTime - lastFlushBatchTime >= INT32_FLAG(default_flush_merged_buffer_interval)) {
TimeoutFlushManager::GetInstance()->FlushTimeoutBatch();
lastFlushBatchTime = curTime;
}
SET_GAUGE(sLastRunTime, curTime);
unique_ptr<ProcessQueueItem> item;
string configName;
if (!ProcessQueueManager::GetInstance()->PopItem(threadNo, item, configName)) {
if (mIsFlush && ProcessQueueManager::GetInstance()->IsAllQueueEmpty()) {
break;
}
ProcessQueueManager::GetInstance()->Wait(100);
continue;
}
ADD_COUNTER(sInEventsCnt, item->mEventGroup.GetEvents().size());
ADD_COUNTER(sInGroupsCnt, 1);
ADD_COUNTER(sInGroupDataSizeBytes, item->mEventGroup.DataSize());
shared_ptr<CollectionPipeline>& pipeline = item->mPipeline;
bool hasOldPipeline = pipeline != nullptr;
if (!hasOldPipeline) {
pipeline = CollectionPipelineManager::GetInstance()->FindConfigByName(configName);
}
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
"discard data")("config", configName));
continue;
}
bool isLog = !item->mEventGroup.GetEvents().empty() && item->mEventGroup.GetEvents()[0].Is<LogEvent>();
vector<PipelineEventGroup> eventGroupList;
eventGroupList.emplace_back(std::move(item->mEventGroup));
pipeline->Process(eventGroupList, item->mInputIndex);
// if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline
if (hasOldPipeline) {
pipeline = CollectionPipelineManager::GetInstance()->FindConfigByName(configName); // update to new pipeline
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
"discard data")("config", configName));
continue;
}
}
if (pipeline->IsFlushingThroughGoPipeline()) {
// TODO:
// 1. allow all event types to be sent to Go pipelines
// 2. use event group protobuf instead
if (isLog) {
for (auto& group : eventGroupList) {
string res, errorMsg;
if (!Serialize(group,
pipeline->GetContext().GetGlobalConfig().mEnableTimestampNanosecond,
pipeline->GetContext().GetLogstoreName(),
res,
errorMsg)) {
LOG_WARNING(pipeline->GetContext().GetLogger(),
("failed to serialize event group",
errorMsg)("action", "discard data")("config", configName));
pipeline->GetContext().GetAlarm().SendAlarm(SERIALIZE_FAIL_ALARM,
"failed to serialize event group: " + errorMsg
+ "\taction: discard data\tconfig: "
+ configName,
pipeline->GetContext().GetRegion(),
pipeline->GetContext().GetProjectName(),
configName,
pipeline->GetContext().GetLogstoreName());
continue;
}
LogtailPlugin::GetInstance()->ProcessLogGroup(
pipeline->GetContext().GetConfigName(),
res,
group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string());
}
}
} else {
pipeline->Send(std::move(eventGroupList));
}
pipeline->SubInProcessCnt();
gThreadedEventPool.CheckGC();
}
}
bool ProcessorRunner::Serialize(
const PipelineEventGroup& group, bool enableNanosecond, const string& logstore, string& res, string& errorMsg) {
sls_logs::LogGroup logGroup;
for (const auto& e : group.GetEvents()) {
if (e.Is<LogEvent>()) {
const auto& logEvent = e.Cast<LogEvent>();
auto log = logGroup.add_logs();
for (const auto& kv : logEvent) {
auto contPtr = log->add_contents();
contPtr->set_key(kv.first.to_string());
contPtr->set_value(kv.second.to_string());
}
log->set_time(logEvent.GetTimestamp());
if (enableNanosecond && logEvent.GetTimestampNanosecond()) {
log->set_time_ns(logEvent.GetTimestampNanosecond().value());
}
} else {
errorMsg = "unsupported event type in event group";
return false;
}
}
for (const auto& tag : group.GetTags()) {
if (tag.first == LOG_RESERVED_KEY_TOPIC) {
logGroup.set_topic(tag.second.to_string());
} else {
auto logTag = logGroup.add_logtags();
logTag->set_key(tag.first.to_string());
logTag->set_value(tag.second.to_string());
}
}
logGroup.set_category(logstore);
size_t size = logGroup.ByteSizeLong();
if (static_cast<int32_t>(size) > INT32_FLAG(max_send_log_group_size)) {
errorMsg = "log group exceeds size limit\tgroup size: " + ToString(size)
+ "\tsize limit: " + ToString(INT32_FLAG(max_send_log_group_size));
return false;
}
res = logGroup.SerializeAsString();
return true;
}
} // namespace logtail