core/plugin/flusher/file/FlusherFile.cpp (63 lines of code) (raw):
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "plugin/flusher/file/FlusherFile.h"
#include "spdlog/async.h"
#include "spdlog/sinks/rotating_file_sink.h"
#include "collection_pipeline/queue/SenderQueueManager.h"
using namespace std;
namespace logtail {
const string FlusherFile::sName = "flusher_file";
bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline) {
static uint32_t cnt = 0;
GenerateQueueKey(to_string(++cnt));
SenderQueueManager::GetInstance()->CreateQueue(mQueueKey, mPluginID, *mContext);
string errorMsg;
// FilePath
if (!GetMandatoryStringParam(config, "FilePath", mFilePath, errorMsg)) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
// MaxFileSize
GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg);
// MaxFiles
GetMandatoryUIntParam(config, "MaxFiles", mMaxFiles, errorMsg);
// create file writer
auto threadPool = std::make_shared<spdlog::details::thread_pool>(10, 1);
auto fileSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(mFilePath, mMaxFileSize, mMaxFiles, true);
mFileWriter
= std::make_shared<spdlog::async_logger>(sName, fileSink, threadPool, spdlog::async_overflow_policy::block);
mFileWriter->set_pattern("%v");
mGroupSerializer = make_unique<JsonEventGroupSerializer>(this);
mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL);
return true;
}
bool FlusherFile::Send(PipelineEventGroup&& g) {
return SerializeAndPush(std::move(g));
}
bool FlusherFile::Flush(size_t key) {
return true;
}
bool FlusherFile::FlushAll() {
return true;
}
bool FlusherFile::SerializeAndPush(PipelineEventGroup&& group) {
string serializedData;
string errorMsg;
BatchedEvents g(std::move(group.MutableEvents()),
std::move(group.GetSizedTags()),
std::move(group.GetSourceBuffer()),
group.GetMetadata(EventGroupMetaKey::SOURCE_ID),
std::move(group.GetExactlyOnceCheckpoint()));
mGroupSerializer->DoSerialize(std::move(g), serializedData, errorMsg);
if (errorMsg.empty()) {
if (!serializedData.empty() && serializedData.back() == '\n') {
serializedData.pop_back();
}
mFileWriter->info(std::move(serializedData));
mFileWriter->flush();
} else {
LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg));
}
return true;
}
} // namespace logtail