core/ebpf/EBPFServer.cpp (426 lines of code) (raw):
// Copyright 2025 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ebpf/EBPFServer.h"
#include <future>
#include <string>
#include <vector>
#include "app_config/AppConfig.h"
#include "common/Flags.h"
#include "common/LogtailCommonFlags.h"
#include "common/MachineInfoUtil.h"
#include "common/http/AsynCurlRunner.h"
#include "common/magic_enum.hpp"
#include "ebpf/Config.h"
#include "ebpf/include/export.h"
#include "logger/Logger.h"
#include "monitor/metric_models/ReentrantMetricsRecord.h"
// #include "plugin/file_security/FileSecurityManager.h"
#include "plugin/network_observer/NetworkObserverManager.h"
#include "plugin/network_security/NetworkSecurityManager.h"
#include "plugin/process_security/ProcessSecurityManager.h"
DEFINE_FLAG_INT64(kernel_min_version_for_ebpf,
"the minimum kernel version that supported eBPF normal running, 4.19.0.0 -> 4019000000",
4019000000);
namespace logtail::ebpf {
static const uint16_t kKernelVersion310 = 3010; // for centos7
static const std::string kKernelNameCentos = "CentOS";
static const uint16_t kKernelCentosMinVersion = 7006;
bool EnvManager::IsSupportedEnv(PluginType type) {
if (!mInited) {
LOG_ERROR(sLogger, ("env manager not inited ...", ""));
return false;
}
bool status = false;
switch (type) {
case PluginType::NETWORK_OBSERVE:
status = mArchSupport && (mBTFSupport || m310Support);
break;
case PluginType::FILE_SECURITY:
case PluginType::NETWORK_SECURITY:
case PluginType::PROCESS_SECURITY: {
status = mArchSupport && mBTFSupport;
break;
}
default:
status = false;
}
if (!status) {
LOG_WARNING(sLogger,
("runtime env not supported, plugin type: ", int(type))("arch support is ", mArchSupport)(
"btf support is ", mBTFSupport)("310 support is ", m310Support));
}
return status;
}
bool EnvManager::AbleToLoadDyLib() {
return mArchSupport;
}
void EnvManager::InitEnvInfo() {
if (mInited) {
return;
}
mInited = true;
#ifdef _MSC_VER
LOG_WARNING(sLogger, ("MS", "not supported"));
mArchSupport = false;
return;
#elif defined(__aarch64__)
LOG_WARNING(sLogger, ("aarch64", "not supported"));
mArchSupport = false;
return;
#elif defined(__arm__)
LOG_WARNING(sLogger, ("arm", "not supported"));
mArchSupport = false;
return;
#elif defined(__i386__)
LOG_WARNING(sLogger, ("i386", "not supported"));
mArchSupport = false;
return;
#endif
mArchSupport = true;
std::string release;
int64_t version = 0;
GetKernelInfo(release, version);
LOG_INFO(sLogger, ("ebpf kernel release", release)("kernel version", version));
if (release.empty()) {
LOG_WARNING(sLogger, ("cannot find kernel release", ""));
mBTFSupport = false;
return;
}
if (version >= INT64_FLAG(kernel_min_version_for_ebpf)) {
mBTFSupport = true;
return;
}
if (version / 1000000 != kKernelVersion310) {
LOG_WARNING(sLogger, ("unsupported kernel version, will not start eBPF plugin ... version", version));
m310Support = false;
return;
}
std::string os;
int64_t osVersion = 0;
if (GetRedHatReleaseInfo(os, osVersion, STRING_FLAG(default_container_host_path))
|| GetRedHatReleaseInfo(os, osVersion)) {
if (os == kKernelNameCentos && osVersion >= kKernelCentosMinVersion) {
m310Support = true;
return;
}
LOG_WARNING(
sLogger,
("unsupported os for 310 kernel, will not start eBPF plugin ...", "")("os", os)("version", osVersion));
m310Support = false;
return;
}
LOG_WARNING(sLogger, ("not redhat release, will not start eBPF plugin ...", ""));
m310Support = false;
}
bool EBPFServer::IsSupportedEnv(PluginType type) {
return mEnvMgr.IsSupportedEnv(type);
}
void EBPFServer::Init() {
if (mInited) {
return;
}
mEnvMgr.InitEnvInfo();
if (!mEnvMgr.AbleToLoadDyLib()) {
return;
}
mInited = true;
mRunning = true;
mHostIp = GetHostIp();
mHostName = GetHostName();
// read host path prefix
if (AppConfig::GetInstance()->IsPurageContainerMode()) {
mHostPathPrefix = STRING_FLAG(default_container_host_path);
LOG_INFO(sLogger, ("running in container mode, would set host path prefix to ", mHostPathPrefix));
} else {
LOG_INFO(sLogger, ("running in host mode", "would not set host path prefix ..."));
mHostPathPrefix = "/";
}
LOG_DEBUG(sLogger, ("begin to init timer", ""));
Timer::GetInstance()->Init();
AsynCurlRunner::GetInstance()->Init();
LOG_DEBUG(sLogger, ("begin to start poller", ""));
mPoller = async(std::launch::async, &EBPFServer::PollPerfBuffers, this);
LOG_DEBUG(sLogger, ("begin to start handler", ""));
mHandler = async(std::launch::async, &EBPFServer::HandlerEvents, this);
// check env
// mMonitorMgr = std::make_unique<eBPFSelfMonitorMgr>();
DynamicMetricLabels dynamicLabels;
dynamicLabels.emplace_back(METRIC_LABEL_KEY_PROJECT, [this]() -> std::string { return this->GetAllProjects(); });
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mRef,
MetricCategory::METRIC_CATEGORY_RUNNER,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_EBPF_SERVER}},
std::move(dynamicLabels));
mPollProcessEventsTotal = mRef.CreateCounter(METRIC_RUNNER_EBPF_POLL_PROCESS_EVENTS_TOTAL);
mLossProcessEventsTotal = mRef.CreateCounter(METRIC_RUNNER_EBPF_LOSS_PROCESS_EVENTS_TOTAL);
mProcessCacheMissTotal = mRef.CreateCounter(METRIC_RUNNER_EBPF_PROCESS_CACHE_MISS_TOTAL);
mProcessCacheSize = mRef.CreateIntGauge(METRIC_RUNNER_EBPF_PROCESS_CACHE_SIZE);
mEBPFAdapter->Init();
mProcessCacheManager = std::make_shared<ProcessCacheManager>(mEBPFAdapter,
mHostName,
mHostPathPrefix,
mDataEventQueue,
mPollProcessEventsTotal,
mLossProcessEventsTotal,
mProcessCacheMissTotal,
mProcessCacheSize);
// ebpf config
auto configJson = AppConfig::GetInstance()->GetConfig();
mAdminConfig.LoadEbpfConfig(configJson);
}
void EBPFServer::Stop() {
if (!mInited) {
return;
}
mInited = false;
mRunning = false;
LOG_INFO(sLogger, ("begin to stop all plugins", ""));
for (int i = 0; i < int(PluginType::MAX); i++) {
auto pipelineName = mLoadedPipeline[i];
if (pipelineName.size()) {
bool ret = DisablePlugin(pipelineName, static_cast<PluginType>(i));
LOG_INFO(sLogger,
("force stop plugin",
magic_enum::enum_name(static_cast<PluginType>(i)))("pipeline", pipelineName)("ret", ret));
}
}
std::future_status s1 = mPoller.wait_for(std::chrono::seconds(1));
std::future_status s2 = mHandler.wait_for(std::chrono::seconds(1));
if (mPoller.valid()) {
if (s1 == std::future_status::ready) {
LOG_DEBUG(sLogger, ("poller thread", "stopped successfully"));
} else {
LOG_WARNING(sLogger, ("poller thread", "forced to stopped"));
}
}
if (mHandler.valid()) {
if (s2 == std::future_status::ready) {
LOG_DEBUG(sLogger, ("handler thread", "stopped successfully"));
} else {
LOG_WARNING(sLogger, ("handler thread", "forced to stopped"));
}
}
}
// maybe update or create
bool EBPFServer::startPluginInternal(const std::string& pipelineName,
uint32_t pluginIndex,
PluginType type,
const logtail::CollectionPipelineContext* ctx,
const std::variant<SecurityOptions*, ObserverNetworkOption*>& options,
const PluginMetricManagerPtr& metricManager) {
std::string prevPipelineName = CheckLoadedPipelineName(type);
if (prevPipelineName == pipelineName) {
LOG_INFO(sLogger, ("begin to update plugin", magic_enum::enum_name(type)));
auto pluginMgr = GetPluginManager(type);
if (pluginMgr) {
int res = pluginMgr->Update(options);
LOG_WARNING(sLogger, ("update plugin for type", magic_enum::enum_name(type))("res", res));
if (res) {
return false;
}
res = pluginMgr->Resume(options);
if (res) {
return false;
}
pluginMgr->UpdateContext(ctx, ctx->GetProcessQueueKey(), pluginIndex);
LOG_WARNING(sLogger, ("resume plugin for type", magic_enum::enum_name(type))("res", res));
return true;
}
LOG_ERROR(sLogger, ("no plugin registered, should not happen", magic_enum::enum_name(type)));
return false;
}
UpdatePipelineName(type, pipelineName, ctx->GetProjectName());
if (type != PluginType::NETWORK_OBSERVE) {
auto res = mProcessCacheManager->Init();
LOG_INFO(sLogger, ("ProcessCacheManager inited", res));
}
// step1: convert options to export type
auto eBPFConfig = std::make_unique<PluginConfig>();
eBPFConfig->mPluginType = type;
// call update function
// step2: call init function
auto pluginMgr = GetPluginManager(type);
switch (type) {
case PluginType::PROCESS_SECURITY: {
if (!pluginMgr) {
pluginMgr = ProcessSecurityManager::Create(
mProcessCacheManager, mEBPFAdapter, mDataEventQueue, metricManager);
UpdatePluginManager(type, pluginMgr);
}
break;
}
case PluginType::NETWORK_OBSERVE: {
if (!pluginMgr) {
pluginMgr = NetworkObserverManager::Create(
mProcessCacheManager, mEBPFAdapter, mDataEventQueue, metricManager);
UpdatePluginManager(type, pluginMgr);
}
break;
}
case PluginType::NETWORK_SECURITY: {
if (!pluginMgr) {
pluginMgr = NetworkSecurityManager::Create(
mProcessCacheManager, mEBPFAdapter, mDataEventQueue, metricManager);
UpdatePluginManager(type, pluginMgr);
}
break;
}
// case PluginType::FILE_SECURITY: {
// if (!pluginMgr) {
// pluginMgr
// = FileSecurityManager::Create(mProcessCacheManager, mEBPFAdapter, mDataEventQueue,
// metricManager);
// UpdatePluginManager(type, pluginMgr);
// }
// break;
// }
default:
LOG_ERROR(sLogger, ("unknown plugin type", int(type)));
return false;
}
pluginMgr->UpdateContext(ctx, ctx->GetProcessQueueKey(), pluginIndex);
return (pluginMgr->Init(options) == 0);
}
bool EBPFServer::HasRegisteredPlugins() const {
std::lock_guard<std::mutex> lk(mMtx);
for (const auto& pipeline : mLoadedPipeline) {
if (!pipeline.empty()) {
return true;
}
}
return false;
}
bool EBPFServer::EnablePlugin(const std::string& pipelineName,
uint32_t pluginIndex,
PluginType type,
const CollectionPipelineContext* ctx,
const std::variant<SecurityOptions*, ObserverNetworkOption*>& options,
const PluginMetricManagerPtr& mgr) {
if (!IsSupportedEnv(type)) {
return false;
}
return startPluginInternal(pipelineName, pluginIndex, type, ctx, options, mgr);
}
bool EBPFServer::CheckIfNeedStopProcessCacheManager() const {
std::lock_guard<std::mutex> lk(mMtx);
auto nsMgr = mPlugins[static_cast<int>(PluginType::NETWORK_SECURITY)];
auto psMgr = mPlugins[static_cast<int>(PluginType::PROCESS_SECURITY)];
auto fsMgr = mPlugins[static_cast<int>(PluginType::FILE_SECURITY)];
if ((nsMgr && nsMgr->IsExists()) || (psMgr && psMgr->IsExists()) || (fsMgr && fsMgr->IsExists())) {
return false;
}
LOG_INFO(sLogger, ("no security plugin registerd", "begin to stop base manager ... "));
return true;
}
bool EBPFServer::DisablePlugin(const std::string& pipelineName, PluginType type) {
if (!IsSupportedEnv(type)) {
return true;
}
std::string prevPipeline = CheckLoadedPipelineName(type);
if (prevPipeline == pipelineName) {
UpdatePipelineName(type, "", "");
} else {
LOG_WARNING(
sLogger,
("the specified config is not running, prev pipeline", prevPipeline)("curr pipeline", pipelineName));
return true;
}
LOG_INFO(sLogger, ("begin to stop plugin for ", magic_enum::enum_name(type))("pipeline", pipelineName));
auto pluginManager = GetPluginManager(type);
if (pluginManager && pluginManager->IsExists()) {
pluginManager->UpdateContext(nullptr, -1, -1);
int ret = pluginManager->Destroy();
if (ret == 0) {
UpdatePluginManager(type, nullptr);
// pluginManager->UpdateProcessCacheManager(nullptr); // deprecated ... TODO @qianlu.kk
LOG_DEBUG(sLogger, ("stop plugin for", magic_enum::enum_name(type))("pipeline", pipelineName));
if (type == PluginType::NETWORK_SECURITY || type == PluginType::PROCESS_SECURITY
|| type == PluginType::FILE_SECURITY) {
// check if we need stop ProcessCacheManager
if (CheckIfNeedStopProcessCacheManager()) {
mProcessCacheManager->Stop();
}
}
} else {
LOG_ERROR(sLogger, ("failed to stop plugin for", magic_enum::enum_name(type))("pipeline", pipelineName));
}
} else {
LOG_WARNING(sLogger,
("no plugin registered or not running, plugin type", magic_enum::enum_name(type))("pipeline",
pipelineName));
}
return true;
}
std::string EBPFServer::CheckLoadedPipelineName(PluginType type) {
std::lock_guard<std::mutex> lk(mMtx);
return mLoadedPipeline[int(type)];
}
std::string EBPFServer::GetAllProjects() {
std::lock_guard<std::mutex> lk(mMtx);
std::string res;
for (int i = 0; i < int(PluginType::MAX); i++) {
if (mPluginProject[i] != "") {
res += mPluginProject[i];
res += " ";
}
}
return res;
}
void EBPFServer::UpdatePipelineName(PluginType type, const std::string& name, const std::string& project) {
std::lock_guard<std::mutex> lk(mMtx);
mLoadedPipeline[int(type)] = name;
mPluginProject[int(type)] = project;
}
bool EBPFServer::SuspendPlugin(const std::string&, PluginType type) {
if (!IsSupportedEnv(type)) {
return false;
}
auto mgr = GetPluginManager(type);
if (!mgr || !mgr->IsRunning()) {
LOG_DEBUG(sLogger, ("plugin not registered or stopped", ""));
return true;
}
mgr->UpdateContext(nullptr, -1, -1);
int ret = mgr->Suspend();
if (ret) {
LOG_ERROR(sLogger, ("failed to suspend plugin", magic_enum::enum_name(type)));
return false;
}
return true;
}
void EBPFServer::PollPerfBuffers() {
mFrequencyMgr.SetPeriod(std::chrono::milliseconds(100));
while (mRunning) {
auto now = std::chrono::steady_clock::now();
auto nextWindow = mFrequencyMgr.Next();
if (!mFrequencyMgr.Expired(now)) {
std::this_thread::sleep_until(nextWindow);
mFrequencyMgr.Reset(nextWindow);
} else {
mFrequencyMgr.Reset(now);
}
for (int i = 0; i < int(PluginType::MAX); i++) {
auto plugin = GetPluginManager(PluginType(i));
if (!plugin || !plugin->IsRunning()) {
continue;
}
int cnt = plugin->PollPerfBuffer();
LOG_DEBUG(sLogger,
("poll buffer for ", magic_enum::enum_name(PluginType(i)))("cnt", cnt)("running status",
plugin->IsRunning()));
}
}
}
std::shared_ptr<AbstractManager> EBPFServer::GetPluginManager(PluginType type) {
std::lock_guard<std::mutex> lk(mMtx);
if (type == PluginType::MAX) {
return nullptr;
}
return mPlugins[static_cast<int>(type)];
}
void EBPFServer::UpdatePluginManager(PluginType type, std::shared_ptr<AbstractManager> mgr) {
std::lock_guard<std::mutex> lk(mMtx);
if (type == PluginType::MAX) {
return;
}
mPlugins[static_cast<int>(type)] = mgr;
}
void EBPFServer::HandlerEvents() {
std::vector<std::shared_ptr<CommonEvent>> items(1024);
while (mRunning) {
// consume queue
size_t count = mDataEventQueue.wait_dequeue_bulk_timed(items.data(), 4096, std::chrono::milliseconds(200));
LOG_DEBUG(sLogger, ("get data events, number", count));
// handle ....
if (count == 0) {
continue;
}
for (size_t i = 0; i < count; i++) {
auto event = items[i];
auto pluginType = event->GetPluginType();
auto plugin = GetPluginManager(pluginType);
if (plugin) {
// handle event and put into aggregator ...
plugin->HandleEvent(event);
}
}
// handle
items.clear();
items.resize(1024);
}
}
} // namespace logtail::ebpf