core/ebpf/plugin/ProcessCacheManager.cpp (699 lines of code) (raw):

// Copyright 2025 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "ebpf/plugin/ProcessCacheManager.h" #include <charconv> #include <coolbpf/security/bpf_common.h> #include <coolbpf/security/bpf_process_event_type.h> #include <coolbpf/security/data_msg.h> #include <coolbpf/security/msg_type.h> #include <cstdint> #include <algorithm> #include <atomic> #include <unordered_map> #include <utility> #include "_thirdparty/coolbpf/src/security/bpf_process_event_type.h" #include "common/CapabilityUtil.h" #include "common/EncodingUtil.h" #include "common/ProcParser.h" #include "common/StringTools.h" #include "common/StringView.h" #include "ebpf/type/ProcessEvent.h" #include "logger/Logger.h" #include "monitor/metric_models/ReentrantMetricsRecord.h" #include "type/table/BaseElements.h" #include "util/FrequencyManager.h" namespace logtail { namespace ebpf { /////////// ================= for perfbuffer handlers ================= /////////// constexpr size_t kDataArgOffset = offsetof(msg_data, arg); void HandleKernelProcessEvent(void* ctx, int cpu, void* data, uint32_t data_sz) { auto* processCacheMgr = static_cast<ProcessCacheManager*>(ctx); if (!processCacheMgr) { LOG_ERROR(sLogger, ("ProcessCacheManager is null!", "")); return; } if (!data) { LOG_ERROR(sLogger, ("data is null!", "")); return; } processCacheMgr->UpdateRecvEventTotal(); auto* common = static_cast<struct msg_common*>(data); switch (common->op) { case MSG_OP_CLONE: { auto* event = static_cast<struct msg_clone_event*>(data); processCacheMgr->RecordCloneEvent(event); break; } case MSG_OP_EXIT: { auto* event = static_cast<struct msg_exit*>(data); processCacheMgr->RecordExitEvent(event); break; } case MSG_OP_EXECVE: { auto* eventPtr = static_cast<struct msg_execve_event*>(data); processCacheMgr->RecordExecveEvent(eventPtr); break; } case MSG_OP_DATA: { auto* eventPtr = static_cast<msg_data*>(data); processCacheMgr->RecordDataEvent(eventPtr); break; } default: { LOG_WARNING(sLogger, ("Unknown event op", static_cast<int>(common->op))); break; } } } void HandleKernelProcessEventLost(void* ctx, int cpu, unsigned long long cnt) { auto* processCacheMgr = static_cast<ProcessCacheManager*>(ctx); if (!processCacheMgr) { LOG_ERROR(sLogger, ("ProcessCacheManager is null!", "")("lost events", cnt)("cpu", cpu)); return; } processCacheMgr->UpdateLossEventTotal(cnt); } //////////////////////////////////////////////////////////////////////////////////////// /* Decode rawArgs to readable args string * @param rawArgs: \0 separated arg buffer, e.g. arg1\0arg 2\0arg"3" * @return: decoded args string, e.g. arg1 "arg 2" arg\"3\" */ std::string DecodeArgs(StringView& rawArgs) { std::string args; if (rawArgs.empty()) { return args; } args.reserve(rawArgs.size() * 2); StringViewSplitter splitter(rawArgs, kNullSv); bool first = true; for (auto field : splitter) { if (first) { first = false; } else { args += " "; } if (field.find(' ') != std::string::npos || field.find('\t') != std::string::npos || field.find('\n') != std::string::npos) { args += "\""; for (char c : field) { if (c == '"' || c == '\\') { args += '\\'; // Escape the character } if (c == '\n') { args += "\\n"; } else if (c == '\t') { args += "\\t"; } else { args += c; } } args += "\""; } else { args.append(field.data(), field.size()); } } return args; } ProcessCacheManager::ProcessCacheManager(std::shared_ptr<EBPFAdapter>& eBPFAdapter, const std::string& hostName, const std::string& hostPathPrefix, moodycamel::BlockingConcurrentQueue<std::shared_ptr<CommonEvent>>& queue, CounterPtr pollEventsTotal, CounterPtr lossEventsTotal, CounterPtr cacheMissTotal, IntGaugePtr cacheSize) : mEBPFAdapter(eBPFAdapter), mProcParser(hostPathPrefix), mHostName(hostName), mHostPathPrefix(hostPathPrefix), mCommonEventQueue(queue), mPollProcessEventsTotal(std::move(pollEventsTotal)), mLossProcessEventsTotal(std::move(lossEventsTotal)), mProcessCacheMissTotal(std::move(cacheMissTotal)), mProcessCacheSize(std::move(cacheSize)) { mDataMap.reserve(kInitDataMapSize); } bool ProcessCacheManager::Init() { if (mInited) { return true; } mInited = true; mFrequencyMgr.SetPeriod(std::chrono::milliseconds(100)); auto ebpfConfig = std::make_unique<PluginConfig>(); ebpfConfig->mPluginType = PluginType::PROCESS_SECURITY; ProcessConfig pconfig; pconfig.mPerfBufferSpec = {{"tcpmon_map", 128, this, HandleKernelProcessEvent, HandleKernelProcessEventLost}}; ebpfConfig->mConfig = pconfig; mRunFlag = true; mPoller = async(std::launch::async, &ProcessCacheManager::pollPerfBuffers, this); bool status = mEBPFAdapter->StartPlugin(PluginType::PROCESS_SECURITY, std::move(ebpfConfig)); if (!status) { LOG_ERROR(sLogger, ("failed to start process security plugin", "")); return false; } auto ret = syncAllProc(); if (ret) { LOG_WARNING(sLogger, ("failed to sync all proc, ret", ret)); } return true; } void ProcessCacheManager::Stop() { if (!mInited) { return; } auto res = mEBPFAdapter->StopPlugin(PluginType::PROCESS_SECURITY); LOG_INFO(sLogger, ("stop process probes for base manager, status", res)); mRunFlag = false; std::future_status s1 = mPoller.wait_for(std::chrono::seconds(1)); if (mPoller.valid()) { if (s1 == std::future_status::ready) { LOG_INFO(sLogger, ("poller thread", "stopped successfully")); } else { LOG_WARNING(sLogger, ("poller thread", "forced to stopped")); } } mInited = false; } void ProcessCacheManager::UpdateRecvEventTotal(uint64_t count) { ADD_COUNTER(mPollProcessEventsTotal, count); } void ProcessCacheManager::UpdateLossEventTotal(uint64_t count) { ADD_COUNTER(mLossProcessEventsTotal, count); } void ProcessCacheManager::RecordExecveEvent(msg_execve_event* eventPtr) { auto cacheValue = msgExecveEventToProcessCacheValue(*eventPtr); mProcessCache.IncRef({cacheValue->mPPid, cacheValue->mPKtime}); LOG_DEBUG(sLogger, ("push execve event. IncRef pid", cacheValue->mPPid)("ktime", cacheValue->mPKtime)); mProcessCache.AddCache({eventPtr->process.pid, eventPtr->process.ktime}, std::move(cacheValue)); LOG_DEBUG(sLogger, ("push execve event. AddCache pid", eventPtr->process.pid)("ktime", eventPtr->process.ktime)); if (eventPtr->cleanup_key.ktime != 0) { auto parent = mProcessCache.Lookup({eventPtr->cleanup_key.pid, eventPtr->cleanup_key.ktime}); if (parent) { // dec grand parent's ref count mProcessCache.DecRef({parent->mPPid, parent->mPKtime}, eventPtr->common.ktime); LOG_DEBUG(sLogger, ("push execve event. DecRef pid", parent->mPPid)("ktime", parent->mPKtime)); } mProcessCache.DecRef({eventPtr->cleanup_key.pid, eventPtr->cleanup_key.ktime}, eventPtr->common.ktime); LOG_DEBUG(sLogger, ("push execve event. DecRef pid", eventPtr->cleanup_key.pid)("ktime", eventPtr->cleanup_key.ktime)); } if (mFlushProcessEvent) { auto processEvent = std::make_shared<ProcessEvent>(eventPtr->process.pid, eventPtr->process.ktime, KernelEventType::PROCESS_EXECVE_EVENT, eventPtr->common.ktime); mCommonEventQueue.enqueue(std::move(processEvent)); } mProcessCache.ClearExpiredCache(eventPtr->process.ktime); // restrict memory usage in abnormal conditions // if we cannot clear old data, just clear all // TODO: maybe we can iterate over the /proc folder and remove unexisting entries if (mProcessCache.Size() > kMaxCacheSize) { LOG_WARNING(sLogger, ("process cache size exceed limit", kMaxCacheSize)("size", mProcessCache.Size())); mProcessCache.ClearCache(); } } void ProcessCacheManager::RecordExitEvent(msg_exit* eventPtr) { auto cacheValue = mProcessCache.Lookup({eventPtr->current.pid, eventPtr->current.ktime}); if (cacheValue) { // dec self and parent's ref mProcessCache.DecRef({cacheValue->mPPid, cacheValue->mPKtime}, eventPtr->common.ktime); LOG_DEBUG(sLogger, ("push exit event. DecRef pid", cacheValue->mPPid)("ktime", cacheValue->mPKtime)); mProcessCache.DecRef({eventPtr->current.pid, eventPtr->current.ktime}, eventPtr->common.ktime); LOG_DEBUG(sLogger, ("push exit event. DecRef pid", eventPtr->current.pid)("ktime", eventPtr->current.ktime)); } if (mFlushProcessEvent) { auto event = std::make_shared<ProcessExitEvent>(eventPtr->current.pid, eventPtr->current.ktime, KernelEventType::PROCESS_EXIT_EVENT, eventPtr->common.ktime, eventPtr->info.code, eventPtr->info.tid); if (event) { mCommonEventQueue.enqueue(std::move(event)); } } mProcessCache.ClearExpiredCache(eventPtr->common.ktime); } void ProcessCacheManager::RecordCloneEvent(msg_clone_event* eventPtr) { auto cacheValue = msgCloneEventToProcessCacheValue(*eventPtr); if (!cacheValue) { return; } mProcessCache.IncRef({eventPtr->parent.pid, eventPtr->parent.ktime}); LOG_DEBUG(sLogger, ("push clone event. IncRef pid", eventPtr->parent.pid)("ktime", eventPtr->parent.ktime)); mProcessCache.AddCache({eventPtr->tgid, eventPtr->ktime}, std::move(cacheValue)); LOG_DEBUG(sLogger, ("push clone event. AddCache pid", eventPtr->tgid)("ktime", eventPtr->ktime)); if (mFlushProcessEvent) { auto event = std::make_shared<ProcessEvent>(static_cast<uint32_t>(eventPtr->tgid), static_cast<uint64_t>(eventPtr->ktime), KernelEventType::PROCESS_CLONE_EVENT, static_cast<uint64_t>(eventPtr->common.ktime)); if (event) { mCommonEventQueue.enqueue(std::move(event)); } } mProcessCache.ClearExpiredCache(eventPtr->common.ktime); // restrict memory usage in abnormal conditions // if we cannot clear old data, just clear all // TODO: maybe we can iterate over the /proc folder and remove unexisting entries // if (mProcessCache.Size() > kMaxCacheSize) { if (mProcessCache.Size() > 3000) { LOG_WARNING(sLogger, ("process cache size exceed limit", kMaxCacheSize)("size", mProcessCache.Size())); mProcessCache.PrintDebugInfo(); mProcessCache.ClearCache(); } } void ProcessCacheManager::RecordDataEvent(msg_data* eventPtr) { LOG_DEBUG(sLogger, ("[receive_data_event] size", eventPtr->common.size)("pid", eventPtr->id.pid)("time", eventPtr->id.time)( "data", std::string(eventPtr->arg, eventPtr->common.size - offsetof(msg_data, arg)))); dataAdd(eventPtr); } std::string ProcessCacheManager::GenerateExecId(uint32_t pid, uint64_t ktime) { // /proc/sys/kernel/pid_max is usually 7 digits 4194304 // nano timestamp is usually 19 digits std::string execid; execid.reserve(mHostName.size() + 1 + 19 + 1 + 7); execid.assign(mHostName).append(":").append(std::to_string(ktime)).append(":").append(std::to_string(pid)); return Base64Enconde(execid); } bool ProcessCacheManager::FinalizeProcessTags(uint32_t pid, uint64_t ktime, LogEvent& logEvent) { static const std::string kUnkownStr = "unknown"; auto procPtr = mProcessCache.Lookup({pid, ktime}); if (!procPtr) { ADD_COUNTER(mProcessCacheMissTotal, 1); LOG_WARNING(sLogger, ("cannot find proc in cache, pid", pid)("ktime", ktime)("size", mProcessCache.Size())); return false; } // event_type, added by xxx_security_manager // call_name, added by xxx_security_manager // event_time, added by xxx_security_manager // finalize proc tags auto& proc = *procPtr; auto& sb = logEvent.GetSourceBuffer(); auto execIdSb = sb->CopyString(proc.Get<kExecId>()); logEvent.SetContentNoCopy(kExecId.LogKey(), StringView(execIdSb.data, execIdSb.size)); auto pidSb = sb->CopyString(proc.Get<kProcessId>()); logEvent.SetContentNoCopy(kProcessId.LogKey(), StringView(pidSb.data, pidSb.size)); auto uidSb = sb->CopyString(proc.Get<kUid>()); logEvent.SetContentNoCopy(kUid.LogKey(), StringView(uidSb.data, uidSb.size)); auto userSb = sb->CopyString(proc.Get<kUser>()); logEvent.SetContentNoCopy(kUser.LogKey(), StringView(userSb.data, userSb.size)); auto binarySb = sb->CopyString(proc.Get<kBinary>()); logEvent.SetContentNoCopy(kBinary.LogKey(), StringView(binarySb.data, binarySb.size)); auto argsSb = sb->CopyString(proc.Get<kArguments>()); logEvent.SetContentNoCopy(kArguments.LogKey(), StringView(argsSb.data, argsSb.size)); auto cwdSb = sb->CopyString(proc.Get<kCWD>()); logEvent.SetContentNoCopy(kCWD.LogKey(), StringView(cwdSb.data, cwdSb.size)); auto ktimeSb = sb->CopyString(proc.Get<kKtime>()); logEvent.SetContentNoCopy(kKtime.LogKey(), StringView(ktimeSb.data, ktimeSb.size)); auto permitted = sb->CopyString(proc.Get<kCapPermitted>()); logEvent.SetContentNoCopy(kCapPermitted.LogKey(), StringView(permitted.data, permitted.size)); auto effective = sb->CopyString(proc.Get<kCapEffective>()); logEvent.SetContentNoCopy(kCapEffective.LogKey(), StringView(effective.data, effective.size)); auto inheritable = sb->CopyString(proc.Get<kCapInheritable>()); logEvent.SetContentNoCopy(kCapInheritable.LogKey(), StringView(inheritable.data, inheritable.size)); auto parentProcPtr = mProcessCache.Lookup({proc.mPPid, proc.mPKtime}); // for parent if (!parentProcPtr) { return true; } // finalize parent tags auto& parentProc = *parentProcPtr; auto parentExecIdSb = sb->CopyString(parentProc.Get<kExecId>()); logEvent.SetContentNoCopy(kParentExecId.LogKey(), StringView(parentExecIdSb.data, parentExecIdSb.size)); auto parentPidSb = sb->CopyString(parentProc.Get<kProcessId>()); logEvent.SetContentNoCopy(kParentProcessId.LogKey(), StringView(parentPidSb.data, parentPidSb.size)); auto parentUidSb = sb->CopyString(parentProc.Get<kUid>()); logEvent.SetContentNoCopy(kParentUid.LogKey(), StringView(parentUidSb.data, parentUidSb.size)); auto parentUserSb = sb->CopyString(parentProc.Get<kUser>()); logEvent.SetContentNoCopy(kParentUser.LogKey(), StringView(parentUserSb.data, parentUserSb.size)); auto parentBinarySb = sb->CopyString(parentProc.Get<kBinary>()); logEvent.SetContentNoCopy(kParentBinary.LogKey(), StringView(parentBinarySb.data, parentBinarySb.size)); auto parentArgsSb = sb->CopyString(parentProc.Get<kArguments>()); logEvent.SetContentNoCopy(kParentArguments.LogKey(), StringView(parentArgsSb.data, parentArgsSb.size)); auto parentCwdSb = sb->CopyString(parentProc.Get<kCWD>()); logEvent.SetContentNoCopy(kParentCWD.LogKey(), StringView(parentCwdSb.data, parentCwdSb.size)); auto parentKtimeSb = sb->CopyString(parentProc.Get<kKtime>()); logEvent.SetContentNoCopy(kParentKtime.LogKey(), StringView(parentKtimeSb.data, parentKtimeSb.size)); auto parentPermitted = sb->CopyString(parentProc.Get<kCapPermitted>()); logEvent.SetContentNoCopy(kParentCapPermitted.LogKey(), StringView(parentPermitted.data, parentPermitted.size)); auto parentEffective = sb->CopyString(parentProc.Get<kCapEffective>()); logEvent.SetContentNoCopy(kParentCapEffective.LogKey(), StringView(parentEffective.data, parentEffective.size)); auto parentInheritable = sb->CopyString(parentProc.Get<kCapInheritable>()); logEvent.SetContentNoCopy(kParentCapInheritable.LogKey(), StringView(parentInheritable.data, parentInheritable.size)); return true; } int ProcessCacheManager::syncAllProc() { std::vector<std::shared_ptr<Proc>> procs = listRunningProcs(); // update execve map for (auto& proc : procs) { writeProcToBPFMap(proc); } // add kernel thread (pid 0) msg_execve_key key{}; key.pid = 0; key.ktime = 0; execve_map_value value{}; value.pkey.pid = 0; value.pkey.ktime = 1; value.key.pid = 0; value.key.ktime = 1; mEBPFAdapter->BPFMapUpdateElem(PluginType::PROCESS_SECURITY, "execve_map", &key.pid, &value, 0); // generage execve event ... for (const auto& proc : procs) { pushProcEvent(*proc); } return 0; } std::vector<std::shared_ptr<Proc>> ProcessCacheManager::listRunningProcs() { std::vector<std::shared_ptr<Proc>> processes; for (const auto& entry : std::filesystem::directory_iterator(mHostPathPrefix / "proc")) { if (!entry.is_directory()) { continue; } std::string dirName = entry.path().filename().string(); int32_t pid = 0; if (!StringTo(dirName, pid)) { continue; } auto procPtr = std::make_shared<Proc>(); if (mProcParser.ParseProc(pid, *procPtr)) { processes.emplace_back(procPtr); } } LOG_DEBUG(sLogger, ("Read ProcFS prefix", mHostPathPrefix)("append process cnt", processes.size())); return processes; } int ProcessCacheManager::writeProcToBPFMap(const std::shared_ptr<Proc>& proc) { // Proc -> execve_map_value execve_map_value value{}; value.pkey.pid = proc->ppid; value.pkey.ktime = proc->pktime; value.key.pid = proc->pid; value.key.ktime = proc->ktime; value.flags = 0; value.nspid = proc->nspid; value.caps = {{{proc->permitted, proc->effective, proc->inheritable}}}; value.ns = {{{proc->uts_ns, proc->ipc_ns, proc->mnt_ns, proc->pid, proc->pid_for_children_ns, proc->net_ns, proc->time_ns, proc->time_for_children_ns, proc->cgroup_ns, proc->user_ns}}}; value.bin.path_length = proc->exe.size(); ::memcpy(value.bin.path, proc->exe.data(), std::min(BINARY_PATH_MAX_LEN, static_cast<int>(proc->exe.size()))); // update bpf map int res = mEBPFAdapter->BPFMapUpdateElem(PluginType::PROCESS_SECURITY, "execve_map", &proc->pid, &value, 0); LOG_DEBUG(sLogger, ("update bpf map, pid", proc->pid)("res", res)); return res; } void ProcessCacheManager::pushProcEvent(const Proc& proc) { std::shared_ptr<ProcessCacheValue> cacheValue = procToProcessCacheValue(proc); mProcessCache.AddCache({proc.pid, proc.ktime}, std::move(cacheValue)); LOG_DEBUG(sLogger, ("push proc event. AddCache pid", proc.pid)("ktime", proc.ktime)); } void ProcessCacheManager::pollPerfBuffers() { int zero = 0; LOG_DEBUG(sLogger, ("enter poller thread", "")); while (mRunFlag) { auto now = std::chrono::steady_clock::now(); auto nextWindow = mFrequencyMgr.Next(); if (!mFrequencyMgr.Expired(now)) { std::this_thread::sleep_until(nextWindow); mFrequencyMgr.Reset(nextWindow); } else { mFrequencyMgr.Reset(now); } auto ret = mEBPFAdapter->PollPerfBuffers( PluginType::PROCESS_SECURITY, kDefaultMaxBatchConsumeSize, &zero, kDefaultMaxWaitTimeMS); LOG_DEBUG(sLogger, ("poll event num", ret)); } LOG_DEBUG(sLogger, ("exit poller thread", "")); SET_GAUGE(mProcessCacheSize, mProcessCache.Size()); } std::shared_ptr<ProcessCacheValue> ProcessCacheManager::procToProcessCacheValue(const Proc& proc) { auto cacheValue = std::make_shared<ProcessCacheValue>(); auto execId = GenerateExecId(proc.pid, proc.ktime); StringView rawArgs = proc.cmdline; auto nullPos = rawArgs.find('\0'); if (nullPos != std::string::npos) { rawArgs = rawArgs.substr(nullPos + 1); } else { rawArgs = rawArgs.substr(rawArgs.size(), 0); } cacheValue->mPPid = proc.ppid; cacheValue->mPKtime = proc.pktime; cacheValue->SetContent<kArguments>(DecodeArgs(rawArgs)); LOG_DEBUG(sLogger, ("raw_args", rawArgs)("args", cacheValue->Get<kArguments>())); cacheValue->SetContent<kExecId>(execId); cacheValue->SetContent<kProcessId>(proc.pid); cacheValue->SetContent<kCWD>(proc.cwd); cacheValue->SetContent<kKtime>(proc.ktime); if (proc.cmdline.empty()) { cacheValue->SetContent<kBinary>(proc.comm); // event.process.nspid = proc.nspid; cacheValue->SetContent<kUid>(0); auto userName = mProcParser.GetUserNameByUid(0); cacheValue->SetContent<kUser>(userName); // event.process.auid = std::numeric_limits<uint32_t>::max(); // event.process.flags = static_cast<uint32_t>(EVENT_PROC_FS); } else { cacheValue->SetContent<kBinary>(proc.exe); cacheValue->SetContent<kUid>(proc.effectiveUid); auto userName = mProcParser.GetUserNameByUid(proc.effectiveUid); auto permitted = GetCapabilities(proc.permitted, *cacheValue->GetSourceBuffer()); auto effective = GetCapabilities(proc.effective, *cacheValue->GetSourceBuffer()); auto inheritable = GetCapabilities(proc.inheritable, *cacheValue->GetSourceBuffer()); cacheValue->SetContentNoCopy<kUser>(userName); cacheValue->SetContentNoCopy<kCapPermitted>(permitted); cacheValue->SetContentNoCopy<kCapEffective>(effective); cacheValue->SetContentNoCopy<kCapInheritable>(inheritable); // event.process.nspid = proc.nspid; // event.process.auid = proc.auid; // event.process.flags = proc.flags; // event.process.cmdline = proc.cmdline; // event.kube.docker = proc.container_id; } return cacheValue; } std::shared_ptr<ProcessCacheValue> ProcessCacheManager::msgExecveEventToProcessCacheValue(const msg_execve_event& event) { auto cacheValue = std::make_shared<ProcessCacheValue>(); if (event.cleanup_key.ktime == 0 || (event.process.flags & EVENT_CLONE) != 0) { cacheValue->mPPid = event.parent.pid; cacheValue->mPKtime = event.parent.ktime; } else { // process created from execve only cacheValue->mPPid = event.cleanup_key.pid; cacheValue->mPKtime = event.cleanup_key.ktime; } auto execId = GenerateExecId(event.process.pid, event.process.ktime); auto userName = mProcParser.GetUserNameByUid(event.process.uid); auto permitted = GetCapabilities(event.creds.caps.permitted, *cacheValue->GetSourceBuffer()); auto effective = GetCapabilities(event.creds.caps.effective, *cacheValue->GetSourceBuffer()); auto inheritable = GetCapabilities(event.creds.caps.inheritable, *cacheValue->GetSourceBuffer()); fillProcessDataFields(event, *cacheValue); cacheValue->SetContent<kExecId>(execId); cacheValue->SetContent<kProcessId>(event.process.pid); cacheValue->SetContent<kUid>(event.process.uid); cacheValue->SetContent<kUser>(userName); cacheValue->SetContent<kKtime>(event.process.ktime); cacheValue->SetContentNoCopy<kCapPermitted>(permitted); cacheValue->SetContentNoCopy<kCapEffective>(effective); cacheValue->SetContentNoCopy<kCapInheritable>(inheritable); // parse exec // event->process.tid = eventPtr->process.tid; // event->process.nspid = eventPtr->process.nspid; // event->process.auid = eventPtr->process.auid; // event->process.secure_exec = eventPtr->process.secureexec; // event->process.nlink = eventPtr->process.i_nlink; // event->process.ino = eventPtr->process.i_ino; // dockerid // event->kube.docker = std::string(eventPtr->kube.docker_id); return cacheValue; } bool ProcessCacheManager::fillProcessDataFields(const msg_execve_event& event, ProcessCacheValue& cacheValue) { // When filename or args are in event.buffer, they are null terminated. // When they are in data_event, they are not null terminated. // args && filename static const StringView kENoMem = "enomem"; thread_local std::string filename; thread_local std::string argsdata; StringView args = kEmptyStringView; StringView cwd = kEmptyStringView; // verifier size // SIZEOF_EVENT is the total size of all fixed fields, = offsetof(msg_process, args) = 56 auto size = event.process.size - SIZEOF_EVENT; // remain size if (size > PADDED_BUFFER - SIZEOF_EVENT) { // size exceed args buffer size LOG_ERROR( sLogger, ("error", "msg exec size larger than argsbuffer")("pid", event.process.pid)("ktime", event.process.ktime)); cacheValue.SetContentNoCopy<kBinary>(kENoMem); cacheValue.SetContentNoCopy<kArguments>(kENoMem); cacheValue.SetContentNoCopy<kCWD>(kENoMem); return false; } // executable filename const char* buffer = event.buffer + SIZEOF_EVENT; // equivalent to eventPtr->process.args; if (event.process.flags & EVENT_DATA_FILENAME) { // filename should be in data cache if (size < sizeof(data_event_desc)) { LOG_ERROR(sLogger, ("EVENT_DATA_FILENAME", "msg exec size less than sizeof(data_event_desc)")( "pid", event.process.pid)("ktime", event.process.ktime)); cacheValue.SetContentNoCopy<kBinary>(kENoMem); cacheValue.SetContentNoCopy<kArguments>(kENoMem); cacheValue.SetContentNoCopy<kCWD>(kENoMem); return false; } const auto* desc = reinterpret_cast<const data_event_desc*>(buffer); LOG_DEBUG(sLogger, ("EVENT_DATA_FILENAME, size", desc->size)("leftover", desc->leftover)("pid", desc->id.pid)("ktime", desc->id.time)); filename = dataGetAndRemove(desc); if (filename.empty()) { LOG_WARNING( sLogger, ("EVENT_DATA_FILENAME", "not found in data cache")("pid", desc->id.pid)("ktime", desc->id.time)); } buffer += sizeof(data_event_desc); size -= sizeof(data_event_desc); } else if ((event.process.flags & EVENT_ERROR_FILENAME) == 0) { // filename should be in process.args const char* nullPos = std::find(buffer, buffer + size, '\0'); filename = std::string(buffer, nullPos - buffer); size -= nullPos - buffer; if (size == 0) { // no tailing \0 found buffer = nullPos; } else { buffer = nullPos + 1; // skip \0 --size; } } else { LOG_WARNING( sLogger, ("EVENT_DATA_FILENAME", "ebpf get data error")("pid", event.process.pid)("ktime", event.process.ktime)); filename.clear(); } // args & cmd if (event.process.flags & EVENT_DATA_ARGS) { // arguments should be in data cache if (size < sizeof(data_event_desc)) { LOG_ERROR(sLogger, ("EVENT_DATA_ARGS", "msg exec size less than sizeof(data_event_desc)")("pid", event.process.pid)( "ktime", event.process.ktime)); cacheValue.SetContent<kBinary>(filename); cacheValue.SetContentNoCopy<kArguments>(kENoMem); cacheValue.SetContentNoCopy<kCWD>(kENoMem); return false; } const auto* desc = reinterpret_cast<const data_event_desc*>(buffer); LOG_DEBUG(sLogger, ("EVENT_DATA_ARGS, size", desc->size)("leftover", desc->leftover)("pid", desc->id.pid)("ktime", desc->id.time)); argsdata = dataGetAndRemove(desc); if (argsdata.empty()) { LOG_WARNING(sLogger, ("EVENT_DATA_ARGS", "not found in data cache")("pid", desc->id.pid)("ktime", desc->id.time)); } args = argsdata; // the remaining data is cwd if (size > sizeof(data_event_desc)) { cwd = StringView(buffer + sizeof(data_event_desc), size - sizeof(data_event_desc)); } } else if (size > 0) { bool hasCwd = false; if (((event.process.flags & EVENT_NO_CWD_SUPPORT) | (event.process.flags & EVENT_ERROR_CWD) | (event.process.flags & EVENT_ROOT_CWD)) == 0) { hasCwd = true; } const char* nullPos = nullptr; args = StringView(buffer, size); if (hasCwd) { // find the last \0 to serapate args and cwd for (int i = size - 1; i >= 0; i--) { if (buffer[i] == '\0') { nullPos = buffer + i; break; } } if (nullPos == nullptr) { cwd = StringView(buffer, size); args = StringView(buffer, 0); } else { cwd = StringView(nullPos + 1, size - (nullPos - buffer + 1)); args = StringView(buffer, nullPos - buffer); } } } if (event.process.flags & EVENT_ERROR_ARGS) { LOG_WARNING(sLogger, ("EVENT_DATA_ARGS", "ebpf get data error")("pid", event.process.pid)("ktime", event.process.ktime)); } if (event.process.flags & EVENT_ERROR_CWD) { LOG_WARNING(sLogger, ("EVENT_DATA_CWD", "ebpf get data error")("pid", event.process.pid)("ktime", event.process.ktime)); } if (event.process.flags & EVENT_ERROR_PATH_COMPONENTS) { LOG_WARNING(sLogger, ("EVENT_DATA_CWD", "cwd too long, maybe truncated")("pid", event.process.pid)("ktime", event.process.ktime)); } // Post handle cwd if (event.process.flags & EVENT_ROOT_CWD) { cwd = "/"; } else if (event.process.flags & EVENT_PROCFS) { cwd = Trim(cwd); } cacheValue.SetContent<kCWD>(cwd); // Post handle args cacheValue.SetContent<kArguments>(DecodeArgs(args)); // Post handle binary if (filename.size()) { if (filename[0] == '/') { ; } else if (!cwd.empty()) { // argsdata is not used anymore, as args and cwd has already been SetContent argsdata.reserve(cwd.size() + 1 + filename.size()); if (cwd.back() != '/') { argsdata.assign(cwd.data(), cwd.size()).append("/").append(filename); } else { argsdata.assign(cwd.data(), cwd.size()).append(filename); } filename.swap(argsdata); } cacheValue.SetContent<kBinary>(filename); } else { LOG_WARNING(sLogger, ("filename is empty, should not happen. pid", event.process.pid)("ktime", event.process.ktime)); cacheValue.SetContentNoCopy<kBinary>(kENoMem); return false; } return true; } std::shared_ptr<ProcessCacheValue> ProcessCacheManager::msgCloneEventToProcessCacheValue(const msg_clone_event& event) { auto parent = mProcessCache.Lookup({event.parent.pid, event.parent.ktime}); if (!parent) { LOG_WARNING(sLogger, ("parent process not found. ppid", event.parent.pid)("pktime", event.parent.ktime)("pid", event.tgid)("ktime", event.ktime)); return nullptr; } auto execId = GenerateExecId(event.tgid, event.ktime); auto cacheValue = std::shared_ptr<ProcessCacheValue>(parent->CloneContents()); cacheValue->mPPid = event.parent.pid; cacheValue->mPKtime = event.parent.ktime; cacheValue->SetContent<kExecId>(execId); cacheValue->SetContent<kProcessId>(event.tgid); cacheValue->SetContent<kKtime>(event.ktime); return cacheValue; } void ProcessCacheManager::dataAdd(msg_data* dataPtr) { if (dataPtr->common.size < kDataArgOffset) { LOG_ERROR(sLogger, ("size is negative, dataPtr.common.size", dataPtr->common.size)("arg offset", kDataArgOffset)); return; } size_t size = dataPtr->common.size - kDataArgOffset; if (size <= MSG_DATA_ARG_LEN) { // std::vector<uint64_t> key = {dataPtr->id.pid, dataPtr->id.time}; std::lock_guard<std::mutex> lk(mDataMapMutex); auto res = mDataMap.find(dataPtr->id); if (res != mDataMap.end()) { auto& prevData = res->second; LOG_DEBUG(sLogger, ("already have data, pid", dataPtr->id.pid)("ktime", dataPtr->id.time)("prevData", prevData)( "data", std::string(dataPtr->arg, size))); prevData.append(dataPtr->arg, size); } else { // restrict memory usage in abnormal conditions // if there is some unused old data, clear it // if we cannot clear old data, just clear all if (mDataMap.size() >= kMaxDataMapSize && mLastDataMapClearTime < std::chrono::system_clock::now() - std::chrono::minutes(1)) { clearExpiredData(dataPtr->id.time); mLastDataMapClearTime = std::chrono::system_clock::now(); LOG_WARNING(sLogger, ("data map size exceed limit", kInitDataMapSize)("size", mDataMap.size())); } if (mDataMap.size() >= kMaxDataMapSize) { LOG_WARNING(sLogger, ("data map size exceed limit", kInitDataMapSize)("size", mDataMap.size())); mDataMap.clear(); } LOG_DEBUG(sLogger, ("no prev data, pid", dataPtr->id.pid)("ktime", dataPtr->id.time)("data", std::string(dataPtr->arg, size))); mDataMap[dataPtr->id] = std::string(dataPtr->arg, size); } } else { LOG_ERROR(sLogger, ("pid", dataPtr->id.pid)("ktime", dataPtr->id.time)("size limit exceeded", size)); } } std::string ProcessCacheManager::dataGetAndRemove(const data_event_desc* desc) { std::string data; { std::lock_guard<std::mutex> lk(mDataMapMutex); auto res = mDataMap.find(desc->id); if (res == mDataMap.end()) { return data; } data.swap(res->second); mDataMap.erase(res); } if (data.size() != desc->size - desc->leftover) { LOG_WARNING(sLogger, ("size bad! data size", data.size())("expect", desc->size - desc->leftover)); return ""; } return data; } void ProcessCacheManager::clearExpiredData(time_t ktime) { ktime -= kMaxCacheExpiredTimeout; for (auto it = mDataMap.begin(); it != mDataMap.end();) { if (time_t(it->first.time) < ktime) { it = mDataMap.erase(it); } else { ++it; } } } } // namespace ebpf } // namespace logtail