core/file_server/FileServer.cpp (240 lines of code) (raw):
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "file_server/FileServer.h"
#include "checkpoint/CheckPointManager.h"
#include "common/Flags.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileTagOptions.h"
#include "file_server/event_handler/LogInput.h"
#include "file_server/polling/PollingDirFile.h"
#include "file_server/polling/PollingModify.h"
#include "plugin/input/InputFile.h"
DEFINE_FLAG_BOOL(enable_polling_discovery, "", true);
using namespace std;
namespace logtail {
FileServer::FileServer() {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef,
MetricCategory::METRIC_CATEGORY_RUNNER,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FILE_SERVER}});
}
// 启动文件服务,包括加载配置、处理检查点、注册事件等
void FileServer::Start() {
ConfigManager::GetInstance()->LoadDockerConfig();
CheckPointManager::Instance()->LoadCheckPoint();
LOG_INFO(sLogger, ("watch dirs", "start"));
auto start = GetCurrentTimeInMilliSeconds();
ConfigManager::GetInstance()->RegisterHandlers();
auto costMs = GetCurrentTimeInMilliSeconds() - start;
if (costMs >= 60 * 1000) {
AlarmManager::GetInstance()->SendAlarm(REGISTER_HANDLERS_TOO_SLOW_ALARM,
"Registering handlers took " + ToString(costMs) + " ms");
LOG_WARNING(sLogger, ("watch dirs", "succeeded")("costMs", costMs));
} else {
LOG_INFO(sLogger, ("watch dirs", "succeeded")("costMs", costMs));
}
EventDispatcher::GetInstance()->AddExistedCheckPointFileEvents();
// the dump time must be reset after dir registration, since it may take long on NFS.
CheckPointManager::Instance()->ResetLastDumpTime();
if (BOOL_FLAG(enable_polling_discovery)) {
PollingModify::GetInstance()->Start();
PollingDirFile::GetInstance()->Start();
}
LogInput::GetInstance()->Start();
LOG_INFO(sLogger, ("file server", "started"));
}
// 暂停文件服务,根据配置更新标志来决定是否要执行相关的清理和保存操作
void FileServer::Pause(bool isConfigUpdate) {
PauseInner();
if (isConfigUpdate) {
EventDispatcher::GetInstance()->DumpAllHandlersMeta(true);
CheckPointManager::Instance()->DumpCheckPointToLocal();
EventDispatcher::GetInstance()->ClearBrokenLinkSet();
PollingDirFile::GetInstance()->ClearCache();
ConfigManager::GetInstance()->ClearFilePipelineMatchCache();
}
}
// 暂停文件服务的内部实现,记录日志并处理暂停逻辑
void FileServer::PauseInner() {
LOG_INFO(sLogger, ("file server pause", "starts"));
// cache must be cleared at last, since logFileReader dump still requires the cache
auto holdOnStart = GetCurrentTimeInMilliSeconds();
if (BOOL_FLAG(enable_polling_discovery)) {
PollingDirFile::GetInstance()->HoldOn();
PollingModify::GetInstance()->HoldOn();
}
LogInput::GetInstance()->HoldOn();
auto holdOnCost = GetCurrentTimeInMilliSeconds() - holdOnStart;
if (holdOnCost >= 60 * 1000) {
AlarmManager::GetInstance()->SendAlarm(HOLD_ON_TOO_SLOW_ALARM,
"Pausing file server took " + ToString(holdOnCost) + "ms");
}
LOG_INFO(sLogger, ("file server pause", "succeeded")("cost", ToString(holdOnCost) + "ms"));
}
// 恢复文件服务,重新注册事件处理程序和恢复日志输入
void FileServer::Resume(bool isConfigUpdate) {
if (isConfigUpdate) {
ClearContainerInfo();
ConfigManager::GetInstance()->DoUpdateContainerPaths();
ConfigManager::GetInstance()->SaveDockerConfig();
}
LOG_INFO(sLogger, ("file server resume", "starts"));
ConfigManager::GetInstance()->RegisterHandlers();
LOG_INFO(sLogger, ("watch dirs", "succeeded"));
if (isConfigUpdate) {
EventDispatcher::GetInstance()->AddExistedCheckPointFileEvents();
}
LogInput::GetInstance()->Resume();
if (BOOL_FLAG(enable_polling_discovery)) {
PollingModify::GetInstance()->Resume();
PollingDirFile::GetInstance()->Resume();
}
LOG_INFO(sLogger, ("file server resume", "succeeded"));
}
// 停止文件服务,将事件处理程序的元数据以及检查点数据保存到本地
void FileServer::Stop() {
PauseInner();
EventDispatcher::GetInstance()->DumpAllHandlersMeta(false);
CheckPointManager::Instance()->DumpCheckPointToLocal();
}
// 获取给定名称的文件发现配置
FileDiscoveryConfig FileServer::GetFileDiscoveryConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameFileDiscoveryConfigsMap.find(name);
if (itr != mPipelineNameFileDiscoveryConfigsMap.end()) {
return itr->second;
}
return make_pair(nullptr, nullptr);
}
// 添加文件发现配置
void FileServer::AddFileDiscoveryConfig(const string& name,
FileDiscoveryOptions* opts,
const CollectionPipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileDiscoveryConfigsMap[name] = make_pair(opts, ctx);
}
// 移除给定名称的文件发现配置
void FileServer::RemoveFileDiscoveryConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileDiscoveryConfigsMap.erase(name);
}
// 获取给定名称的文件读取器配置
FileReaderConfig FileServer::GetFileReaderConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameFileReaderConfigsMap.find(name);
if (itr != mPipelineNameFileReaderConfigsMap.end()) {
return itr->second;
}
return make_pair(nullptr, nullptr);
}
// 添加文件读取器配置
void FileServer::AddFileReaderConfig(const string& name,
const FileReaderOptions* opts,
const CollectionPipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileReaderConfigsMap[name] = make_pair(opts, ctx);
}
// 移除给定名称的文件读取器配置
void FileServer::RemoveFileReaderConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileReaderConfigsMap.erase(name);
}
// 获取给定名称的多行配置
MultilineConfig FileServer::GetMultilineConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameMultilineConfigsMap.find(name);
if (itr != mPipelineNameMultilineConfigsMap.end()) {
return itr->second;
}
return make_pair(nullptr, nullptr);
}
// 添加多行配置
void FileServer::AddMultilineConfig(const string& name,
const MultilineOptions* opts,
const CollectionPipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameMultilineConfigsMap[name] = make_pair(opts, ctx);
}
// 移除给定名称的多行配置
void FileServer::RemoveMultilineConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameMultilineConfigsMap.erase(name);
}
// 获取给定名称的Tag配置
FileTagConfig FileServer::GetFileTagConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameFileTagConfigsMap.find(name);
if (itr != mPipelineNameFileTagConfigsMap.end()) {
return itr->second;
}
return make_pair(nullptr, nullptr);
}
// 添加Tag配置
void FileServer::AddFileTagConfig(const std::string& name,
const FileTagOptions* opts,
const CollectionPipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileTagConfigsMap[name] = make_pair(opts, ctx);
}
// 移除给定名称的Tag配置
void FileServer::RemoveFileTagConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileTagConfigsMap.erase(name);
}
// 保存容器信息
void FileServer::SaveContainerInfo(const string& pipeline, const shared_ptr<vector<ContainerInfo>>& info) {
WriteLock lock(mReadWriteLock);
mAllContainerInfoMap[pipeline] = info;
}
// 获取并移除给定管道的容器信息
shared_ptr<vector<ContainerInfo>> FileServer::GetAndRemoveContainerInfo(const string& pipeline) {
WriteLock lock(mReadWriteLock);
auto iter = mAllContainerInfoMap.find(pipeline);
if (iter == mAllContainerInfoMap.end()) {
return make_shared<vector<ContainerInfo>>();
}
auto res = iter->second;
mAllContainerInfoMap.erase(iter);
return res;
}
// 清除所有容器信息
void FileServer::ClearContainerInfo() {
WriteLock lock(mReadWriteLock);
mAllContainerInfoMap.clear();
}
// 获取插件的指标管理器
PluginMetricManagerPtr FileServer::GetPluginMetricManager(const std::string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNamePluginMetricManagersMap.find(name);
if (itr != mPipelineNamePluginMetricManagersMap.end()) {
return itr->second;
}
return nullptr;
}
// 添加插件的指标管理器
void FileServer::AddPluginMetricManager(const std::string& name, PluginMetricManagerPtr PluginMetricManager) {
WriteLock lock(mReadWriteLock);
mPipelineNamePluginMetricManagersMap[name] = PluginMetricManager;
}
// 移除插件的指标管理器
void FileServer::RemovePluginMetricManager(const std::string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNamePluginMetricManagersMap.erase(name);
}
// 获取“ReentrantMetricsRecordRef”指标记录对象
ReentrantMetricsRecordRef FileServer::GetOrCreateReentrantMetricsRecordRef(const std::string& name,
MetricLabels& labels) {
PluginMetricManagerPtr filePluginMetricManager = GetPluginMetricManager(name);
if (filePluginMetricManager != nullptr) {
return filePluginMetricManager->GetOrCreateReentrantMetricsRecordRef(labels);
}
return nullptr;
}
// 释放“ReentrantMetricsRecordRef”指标记录对象
void FileServer::ReleaseReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels) {
PluginMetricManagerPtr filePluginMetricManager = GetPluginMetricManager(name);
if (filePluginMetricManager != nullptr) {
filePluginMetricManager->ReleaseReentrantMetricsRecordRef(labels);
}
}
// 获取给定名称的“ExactlyOnce”并发级别
uint32_t FileServer::GetExactlyOnceConcurrency(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameEOConcurrencyMap.find(name);
if (itr != mPipelineNameEOConcurrencyMap.end()) {
return itr->second;
}
return 0;
}
// 获取所有配置了“ExactlyOnce”选项的配置名列表
vector<string> FileServer::GetExactlyOnceConfigs() const {
ReadLock lock(mReadWriteLock);
vector<string> res;
for (const auto& item : mPipelineNameEOConcurrencyMap) {
if (item.second > 0) {
res.push_back(item.first);
}
}
return res;
}
// 添加“ExactlyOnce”并发配置
void FileServer::AddExactlyOnceConcurrency(const string& name, uint32_t concurrency) {
WriteLock lock(mReadWriteLock);
mPipelineNameEOConcurrencyMap[name] = concurrency;
}
// 移除给定名称的“ExactlyOnce”并发配置
void FileServer::RemoveExactlyOnceConcurrency(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameEOConcurrencyMap.erase(name);
}
} // namespace logtail