core/unittest/ebpf/AggregatorUnittest.cpp (204 lines of code) (raw):

// Copyright 2025 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include <json/json.h> #include <algorithm> #include <atomic> #include <chrono> #include <iostream> #include <random> #include "common/timer/Timer.h" #include "common/timer/TimerEvent.h" #include "ebpf/type/AggregateEvent.h" #include "ebpf/type/FileEvent.h" #include "ebpf/type/NetworkEvent.h" #include "ebpf/type/ProcessEvent.h" #include "ebpf/util/AggregateTree.h" #include "logger/Logger.h" #include "models/PipelineEventGroup.h" #include "unittest/Unittest.h" DECLARE_FLAG_BOOL(logtail_mode); class HT { public: std::unordered_map<int, std::string> tt; // val是数量 int val = 0; explicit HT(int val) : val(val) {} }; namespace logtail { namespace ebpf { class AggregatorUnittest : public testing::Test { public: AggregatorUnittest() { Timer::GetInstance()->Init(); } ~AggregatorUnittest() { Timer::GetInstance()->Stop(); } void TestBasicAgg(); void TestGetAndReset(); void TestAggManager(); void TestAggregator(); protected: void SetUp() override { agg = std::make_unique<SIZETAggTree<HT, std::vector<std::string>>>( 10, [](std::unique_ptr<HT>& base, const std::vector<std::string>& other) { APSARA_TEST_TRUE(base != nullptr); size_t i = 0; for (auto& key : other) { base->tt[i++] = key; } base->val++; }, [](const std::vector<std::string>& in, std::shared_ptr<SourceBuffer>& sourceBuffer) { // LOG_INFO(sLogger, ("enter generate ... ", "")); return std::make_unique<HT>(0); }); } void TearDown() override {} int GetSum() { return GetSum(*agg); } static int GetSum(SIZETAggTree<HT, std::vector<std::string>>& agg) { int result = 0; agg.ForEach([&result](const auto ht) { result += ht->val; }); return result; } int GetDataNodeCount() { return GetDataNodeCount(*agg); } inline size_t GetHashByDepth(const std::vector<std::string>& data, int depth) { size_t seed = 0UL; for (int i = 0; i < depth; i++) { seed ^= std::hash<std::string>{}(data[i]) + 0x9e3779b9 + (seed << 6) + (seed >> 2); } return seed; } bool Aggregate(const std::vector<std::string>& data, int depth) { return agg->Aggregate(data, std::array<size_t, 1>{GetHashByDepth(data, depth)}); } static int GetDataNodeCount(SIZETAggTree<HT, std::vector<std::string>>& agg) { int node_count = 0; agg.ForEach([&node_count](const auto ht) { node_count++; }); return node_count; } private: std::atomic_bool mFlag = true; std::vector<int> mVec; int mIntervalSec = 1; std::unique_ptr<SIZETAggTree<HT, std::vector<std::string>>> agg; std::unique_ptr<SIZETAggTree<FileEventGroup, std::shared_ptr<FileEvent>>> mAggregateTree; std::unique_ptr<SIZETAggTree<NetworkEventGroup, std::shared_ptr<NetworkEvent>>> mNetAggregateTree; }; std::array<size_t, 2> GenerateAggKey(const std::shared_ptr<FileEvent> event) { std::array<size_t, 2> hash_result; hash_result.fill(0UL); std::hash<std::string> hasher; hash_result[0] = uint64_t(event->mPid) ^ (event->mKtime >> 32) ^ (event->mKtime << 32); // LOG_INFO(sLogger, ("ktime", event->mKtime) ("hash result", hash_result[0])); // aggregate_tree_.Aggregate(); hash_result[1] ^= hasher(event->mPath) + 0x9e3779b9 + (hash_result[1] << 6) + (hash_result[1] >> 2); return hash_result; } void AggregatorUnittest::TestAggregator() { mAggregateTree = std::make_unique<SIZETAggTree<FileEventGroup, std::shared_ptr<FileEvent>>>( 4096, [this](std::unique_ptr<FileEventGroup>& base, const std::shared_ptr<FileEvent>& other) { base->mInnerEvents.emplace_back(std::move(other)); }, [this](const std::shared_ptr<FileEvent>& in, std::shared_ptr<SourceBuffer>& sourceBuffer) { LOG_INFO(sLogger, ("generate node", "")); return std::make_unique<FileEventGroup>(in->mPid, in->mKtime, in->mPath); }); std::vector<std::shared_ptr<FileEvent>> events; // process ===> 2 file events.push_back(std::make_shared<FileEvent>(100, 100, KernelEventType::FILE_MMAP, 0, "path-0")); events.push_back(std::make_shared<FileEvent>(100, 100, KernelEventType::FILE_PATH_TRUNCATE, 1, "path-0")); events.push_back(std::make_shared<FileEvent>(100, 100, KernelEventType::FILE_PATH_TRUNCATE, 2, "path-1")); events.push_back(std::make_shared<FileEvent>(100, 100, KernelEventType::FILE_PATH_TRUNCATE, 3, "path-1")); // process ===> 3 file events.push_back(std::make_shared<FileEvent>(1, 101, KernelEventType::FILE_MMAP, 4, "path-0")); events.push_back(std::make_shared<FileEvent>(1, 101, KernelEventType::FILE_PATH_TRUNCATE, 5, "path-1")); events.push_back(std::make_shared<FileEvent>(1, 101, KernelEventType::FILE_MMAP, 6, "path-2")); events.push_back(std::make_shared<FileEvent>(1, 101, KernelEventType::FILE_MMAP, 7, "path-0")); events.push_back(std::make_shared<FileEvent>(1, 101, KernelEventType::FILE_PATH_TRUNCATE, 8, "path-1")); events.push_back(std::make_shared<FileEvent>(1, 101, KernelEventType::FILE_MMAP, 9, "path-2")); for (auto evt : events) { auto key = GenerateAggKey(evt); // LOG_INFO(sLogger, ("key0", key[0]) ("key1", key[1]) ("path", evt->mPath) ("pid", evt->mPid) ("time", // evt->mKtime)); mAggregateTree->Aggregate(evt, key); } auto nodes = mAggregateTree->GetNodesWithAggDepth(1); APSARA_TEST_EQUAL(2UL, nodes.size()); int globalNodeCnt = 0; int globalEventCnt = 0; for (auto& node : nodes) { // convert to a item and push to process queue // represent a pid, ktime auto pid = node->mChild.begin()->second->mData->mPid; auto ktime = node->mChild.begin()->second->mData->mKtime; PipelineEventGroup eventGroup(std::make_shared<SourceBuffer>()); this->mAggregateTree->ForEach(node, [&](const FileEventGroup* group) { // path level APSARA_TEST_EQUAL(group->mPid, pid); APSARA_TEST_EQUAL(group->mKtime, ktime); globalNodeCnt++; LOG_WARNING(sLogger, ("pid", group->mPid)("ktime", group->mKtime)("path", group->mPath)); for (const auto& innerEvent : group->mInnerEvents) { globalEventCnt++; if (innerEvent->mTimestamp == 9) { APSARA_TEST_EQUAL(group->mPid, 1U); FileEvent* fe = static_cast<FileEvent*>(innerEvent.get()); APSARA_TEST_EQUAL(fe->mPath, "path-2"); } auto* logEvent = eventGroup.AddLogEvent(); auto ts = innerEvent->mTimestamp; auto seconds = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::nanoseconds(ts)); logEvent->SetTimestamp(seconds.count(), ts); if (innerEvent->mTimestamp) { } switch (innerEvent->mEventType) { case KernelEventType::FILE_PATH_TRUNCATE: { logEvent->SetContent("call_name", std::string("security_path_truncate")); logEvent->SetContent("event_type", std::string("kprobe")); break; } case KernelEventType::FILE_MMAP: { logEvent->SetContent("call_name", std::string("security_mmap_file")); logEvent->SetContent("event_type", std::string("kprobe")); break; } case KernelEventType::FILE_PERMISSION_EVENT: { logEvent->SetContent("call_name", std::string("security_file_permission")); logEvent->SetContent("event_type", std::string("kprobe")); break; } default: break; } } }); } APSARA_TEST_EQUAL(globalEventCnt, 10); APSARA_TEST_EQUAL(globalNodeCnt, 5); this->mAggregateTree->Reset(); } void AggregatorUnittest::TestAggManager() { // std::unique_ptr<AggregateEvent> event = std::make_unique<AggregateEvent>( // 1, // [this](const std::chrono::steady_clock::time_point& execTime) { // handler // if (!this->mFlag) { // return false; // } // this->mVec.push_back(1); // return true; // }, // [this]() { // validator // auto isStop = !this->mFlag.load(); // if (isStop) { // LOG_INFO(sLogger, ("stop schedule, mflag", this->mFlag)); // } // return isStop; // }); // Timer::GetInstance()->PushEvent(std::move(event)); // std::this_thread::sleep_for(std::chrono::seconds(4)); // mFlag = false; // std::this_thread::sleep_for(std::chrono::seconds(3)); // APSARA_TEST_EQUAL(mVec.size(), 3UL); // mFlag = true; // std::this_thread::sleep_for(std::chrono::seconds(3)); // APSARA_TEST_EQUAL(mVec.size(), 3UL); } void AggregatorUnittest::TestBasicAgg() { Aggregate({"a", "b", "c", "d"}, 4); Aggregate({"a", "b", "c", "d", "e"}, 4); Aggregate({"a", "b", "d", "r"}, 4); Aggregate({"a", "b", "c", "e"}, 4); Aggregate({"a", "b", "c"}, 3); APSARA_TEST_EQUAL(GetDataNodeCount(), 4); APSARA_TEST_EQUAL(agg->NodeCount(), 4UL); APSARA_TEST_EQUAL(GetSum(), 5); agg->Reset(); APSARA_TEST_EQUAL(GetDataNodeCount(), 0); APSARA_TEST_EQUAL(agg->NodeCount(), 0UL); APSARA_TEST_EQUAL(GetSum(), 0); } void AggregatorUnittest::TestGetAndReset() { Aggregate({"a", "b", "c", "d"}, 4); Aggregate({"a", "b", "c", "d", "e"}, 4); Aggregate({"a", "b", "d", "r"}, 4); Aggregate({"a", "b", "c", "e"}, 4); Aggregate({"a", "b", "c"}, 3); APSARA_TEST_EQUAL(GetDataNodeCount(), 4); APSARA_TEST_EQUAL(agg->NodeCount(), 4UL); APSARA_TEST_EQUAL(GetSum(), 5); auto newTree(agg->GetAndReset()); APSARA_TEST_EQUAL(GetDataNodeCount(), 0); APSARA_TEST_EQUAL(agg->NodeCount(), 0UL); APSARA_TEST_EQUAL(GetSum(), 0); APSARA_TEST_EQUAL(GetDataNodeCount(newTree), 4); APSARA_TEST_EQUAL(newTree.NodeCount(), 4UL); APSARA_TEST_EQUAL(GetSum(newTree), 5); } UNIT_TEST_CASE(AggregatorUnittest, TestBasicAgg); UNIT_TEST_CASE(AggregatorUnittest, TestGetAndReset); // UNIT_TEST_CASE(AggregatorUnittest, TestAggManager); UNIT_TEST_CASE(AggregatorUnittest, TestAggregator); } // namespace ebpf } // namespace logtail UNIT_TEST_MAIN