core/ebpf/plugin/process_security/ProcessSecurityManager.cpp (191 lines of code) (raw):
// Copyright 2025 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ProcessSecurityManager.h"
#include <coolbpf/security/type.h>
#include <memory>
#include <mutex>
#include <utility>
#include "collection_pipeline/CollectionPipelineContext.h"
#include "collection_pipeline/queue/ProcessQueueItem.h"
#include "collection_pipeline/queue/ProcessQueueManager.h"
#include "common/HashUtil.h"
#include "common/TimeUtil.h"
#include "common/magic_enum.hpp"
#include "common/queue/blockingconcurrentqueue.h"
#include "common/timer/Timer.h"
#include "ebpf/Config.h"
#include "ebpf/EBPFServer.h"
#include "ebpf/plugin/AbstractManager.h"
#include "ebpf/plugin/ProcessCacheManager.h"
#include "ebpf/type/AggregateEvent.h"
#include "ebpf/type/table/BaseElements.h"
namespace logtail::ebpf {
ProcessSecurityManager::ProcessSecurityManager(const std::shared_ptr<ProcessCacheManager>& processCacheManager,
const std::shared_ptr<EBPFAdapter>& eBPFAdapter,
moodycamel::BlockingConcurrentQueue<std::shared_ptr<CommonEvent>>& queue,
const PluginMetricManagerPtr& metricManager)
: AbstractManager(processCacheManager, eBPFAdapter, queue, metricManager),
mAggregateTree(
4096,
[](std::unique_ptr<ProcessEventGroup>& base, const std::shared_ptr<CommonEvent>& other) {
base->mInnerEvents.emplace_back(other);
},
[](const std::shared_ptr<CommonEvent>& in, [[maybe_unused]] std::shared_ptr<SourceBuffer>& sourceBuffer) {
return std::make_unique<ProcessEventGroup>(in->mPid, in->mKtime);
}) {
}
int ProcessSecurityManager::Init(
[[maybe_unused]] const std::variant<SecurityOptions*, ObserverNetworkOption*>& options) {
// just set timer ...
// register base manager ...
mFlag = true;
mSuspendFlag = false;
auto processCacheMgr = GetProcessCacheManager();
if (processCacheMgr == nullptr) {
LOG_WARNING(sLogger, ("ProcessCacheManager is null", ""));
return 1;
}
processCacheMgr->MarkProcessEventFlushStatus(true);
std::shared_ptr<ScheduleConfig> scheduleConfig
= std::make_shared<ScheduleConfig>(PluginType::PROCESS_SECURITY, std::chrono::seconds(2));
ScheduleNext(std::chrono::steady_clock::now(), scheduleConfig);
return 0;
}
int ProcessSecurityManager::Destroy() {
mFlag = false;
auto processCacheMgr = GetProcessCacheManager();
if (processCacheMgr == nullptr) {
LOG_WARNING(sLogger, ("ProcessCacheManager is null", ""));
return 1;
}
processCacheMgr->MarkProcessEventFlushStatus(false);
return 0;
}
std::array<size_t, 1> GenerateAggKeyForProcessEvent(const std::shared_ptr<CommonEvent>& event) {
// calculate agg key
std::array<size_t, 1> hashResult{};
std::hash<uint64_t> hasher;
std::array<uint64_t, 2> arr = {uint64_t(event->mPid), event->mKtime};
for (uint64_t x : arr) {
AttrHashCombine(hashResult[0], hasher(x));
}
return hashResult;
}
int ProcessSecurityManager::HandleEvent(const std::shared_ptr<CommonEvent>& event) {
if (!event) {
return 1;
}
auto* processEvent = static_cast<ProcessEvent*>(event.get());
LOG_DEBUG(sLogger,
("receive event, pid", event->mPid)("ktime", event->mKtime)("eventType",
magic_enum::enum_name(event->mEventType)));
if (processEvent == nullptr) {
LOG_ERROR(sLogger,
("failed to convert CommonEvent to ProcessEvent, kernel event type",
magic_enum::enum_name(event->GetKernelEventType()))("PluginType",
magic_enum::enum_name(event->GetPluginType())));
return 1;
}
// calculate agg key
std::array<size_t, 1> hashResult = GenerateAggKeyForProcessEvent(event);
{
WriteLock lk(mLock);
bool ret = mAggregateTree.Aggregate(event, hashResult);
LOG_DEBUG(sLogger, ("after aggregate", ret));
}
return 0;
}
StringBuffer ToStringBuffer(std::shared_ptr<SourceBuffer> sourceBuffer, int32_t val) {
auto buf = sourceBuffer->AllocateStringBuffer(kMaxInt32Width);
auto end = fmt::format_to_n(buf.data, buf.capacity, "{}", val);
*end.out = '\0';
buf.size = end.size;
return buf;
}
bool ProcessSecurityManager::ScheduleNext(const std::chrono::steady_clock::time_point& execTime,
const std::shared_ptr<ScheduleConfig>& config) {
std::chrono::steady_clock::time_point nextTime = execTime + config->mInterval;
Timer::GetInstance()->PushEvent(std::make_unique<AggregateEvent>(nextTime, config));
return ConsumeAggregateTree(execTime);
}
bool ProcessSecurityManager::ConsumeAggregateTree(
[[maybe_unused]] const std::chrono::steady_clock::time_point& execTime) {
if (!mFlag || mSuspendFlag) {
return false;
}
WriteLock lk(mLock);
SIZETAggTree<ProcessEventGroup, std::shared_ptr<CommonEvent>> aggTree = this->mAggregateTree.GetAndReset();
lk.unlock();
// read aggregator
auto nodes = aggTree.GetNodesWithAggDepth(1);
LOG_DEBUG(sLogger, ("enter aggregator ...", nodes.size()));
if (nodes.empty()) {
LOG_DEBUG(sLogger, ("empty nodes...", ""));
return true;
}
auto sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup sharedEventGroup(sourceBuffer);
PipelineEventGroup eventGroup(sourceBuffer);
for (auto& node : nodes) {
LOG_DEBUG(sLogger, ("child num", node->mChild.size()));
// convert to a item and push to process queue
aggTree.ForEach(node, [&](const ProcessEventGroup* group) {
auto sharedEvent = sharedEventGroup.CreateLogEvent();
// represent a process ...
auto processCacheMgr = GetProcessCacheManager();
if (processCacheMgr == nullptr) {
LOG_WARNING(sLogger, ("ProcessCacheManager is null", ""));
return;
}
auto hit = processCacheMgr->FinalizeProcessTags(group->mPid, group->mKtime, *sharedEvent);
if (!hit) {
LOG_WARNING(sLogger, ("cannot find tags for pid", group->mPid)("ktime", group->mKtime));
return;
}
for (const auto& innerEvent : group->mInnerEvents) {
auto* logEvent = eventGroup.AddLogEvent();
for (const auto& it : *sharedEvent) {
logEvent->SetContentNoCopy(it.first, it.second);
}
struct timespec ts = ConvertKernelTimeToUnixTime(innerEvent->mTimestamp);
logEvent->SetTimestamp(ts.tv_sec, ts.tv_nsec);
switch (innerEvent->mEventType) {
case KernelEventType::PROCESS_EXECVE_EVENT: {
logEvent->SetContentNoCopy(kCallName.LogKey(), ProcessSecurityManager::kExecveValue);
// ? kprobe or execve
logEvent->SetContentNoCopy(kEventType.LogKey(), ProcessSecurityManager::kKprobeValue);
break;
}
case KernelEventType::PROCESS_EXIT_EVENT: {
CommonEvent* ce = innerEvent.get();
auto* exitEvent = static_cast<ProcessExitEvent*>(ce);
logEvent->SetContentNoCopy(kCallName.LogKey(), StringView(ProcessSecurityManager::kExitValue));
logEvent->SetContentNoCopy(kEventType.LogKey(), StringView(AbstractManager::kKprobeValue));
auto exitCode = ToStringBuffer(eventGroup.GetSourceBuffer(), exitEvent->mExitCode);
auto exitTid = ToStringBuffer(eventGroup.GetSourceBuffer(), exitEvent->mExitTid);
logEvent->SetContentNoCopy(ProcessSecurityManager::kExitCodeKey,
StringView(exitCode.data, exitCode.size));
logEvent->SetContentNoCopy(ProcessSecurityManager::kExitTidKey,
StringView(exitTid.data, exitTid.size));
break;
}
case KernelEventType::PROCESS_CLONE_EVENT: {
logEvent->SetContentNoCopy(kCallName.LogKey(), ProcessSecurityManager::kCloneValue);
logEvent->SetContentNoCopy(kEventType.LogKey(), ProcessSecurityManager::kKprobeValue);
break;
}
default:
break;
}
}
});
}
{
std::lock_guard lk(mContextMutex);
if (this->mPipelineCtx == nullptr) {
return true;
}
LOG_DEBUG(sLogger, ("event group size", eventGroup.GetEvents().size()));
ADD_COUNTER(mPushLogsTotal, eventGroup.GetEvents().size());
ADD_COUNTER(mPushLogGroupTotal, 1);
std::unique_ptr<ProcessQueueItem> item
= std::make_unique<ProcessQueueItem>(std::move(eventGroup), this->mPluginIndex);
if (QueueStatus::OK != ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
LOG_WARNING(sLogger,
("configName", mPipelineCtx->GetConfigName())("pluginIdx", this->mPluginIndex)(
"[ProcessSecurityEvent] push queue failed!", ""));
}
}
return true;
}
} // namespace logtail::ebpf