core/unittest/ebpf/NetworkObserverUnittest.cpp (524 lines of code) (raw):

// Copyright 2025 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include <chrono> #include <memory> #include <thread> #include "common/TimeUtil.h" #include "common/http/AsynCurlRunner.h" #include "common/queue/blockingconcurrentqueue.h" #include "ebpf/EBPFAdapter.h" #include "ebpf/EBPFServer.h" #include "ebpf/plugin/ProcessCacheManager.h" #include "ebpf/plugin/network_observer/NetworkObserverManager.h" #include "ebpf/protocol/ProtocolParser.h" #include "ebpf/type/NetworkObserverEvent.h" #include "metadata/K8sMetadata.h" #include "unittest/Unittest.h" namespace logtail { namespace ebpf { class NetworkObserverManagerUnittest : public ::testing::Test { public: void TestInitialization(); void TestEventHandling(); void TestDataEventProcessing(); void TestWhitelistManagement(); void TestPerfBufferOperations(); void TestRecordProcessing(); void TestRollbackProcessing(); void TestConfigUpdate(); void TestErrorHandling(); void TestPluginLifecycle(); void TestHandleHostMetadataUpdate(); void TestPeriodicalTask(); void BenchmarkConsumeTask(); protected: void SetUp() override { Timer::GetInstance()->Init(); AsynCurlRunner::GetInstance()->Stop(); mEBPFAdapter = std::make_shared<EBPFAdapter>(); mEBPFAdapter->Init(); mProcessCacheManager = std::make_shared<ProcessCacheManager>( mEBPFAdapter, "test_host", "/", mEventQueue, nullptr, nullptr, nullptr, nullptr); ProtocolParserManager::GetInstance().AddParser(support_proto_e::ProtoHTTP); mManager = NetworkObserverManager::Create(mProcessCacheManager, mEBPFAdapter, mEventQueue, nullptr); EBPFServer::GetInstance()->UpdatePluginManager(PluginType::NETWORK_OBSERVE, mManager); } void TearDown() override { Timer::GetInstance()->Stop(); AsynCurlRunner::GetInstance()->Stop(); mManager->Destroy(); EBPFServer::GetInstance()->UpdatePluginManager(PluginType::NETWORK_OBSERVE, nullptr); } private: std::shared_ptr<NetworkObserverManager> CreateManager() { return NetworkObserverManager::Create(mProcessCacheManager, mEBPFAdapter, mEventQueue, nullptr); } std::shared_ptr<EBPFAdapter> mEBPFAdapter; std::shared_ptr<ProcessCacheManager> mProcessCacheManager; moodycamel::BlockingConcurrentQueue<std::shared_ptr<CommonEvent>> mEventQueue; std::shared_ptr<NetworkObserverManager> mManager; }; void NetworkObserverManagerUnittest::TestInitialization() { // auto mManager = CreateManager(); EXPECT_NE(mManager, nullptr); ObserverNetworkOption options; options.mEnableProtocols = {"HTTP", "MySQL", "Redis"}; options.mEnableCids = {"container1", "container2"}; options.mDisableCids = {"container3"}; int result = mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); EXPECT_EQ(result, 0); EXPECT_EQ(mManager->GetPluginType(), PluginType::NETWORK_OBSERVE); } void NetworkObserverManagerUnittest::TestEventHandling() { // auto mManager = NetworkObserverManager::Create(mProcessCacheManager, mEBPFAdapter, mEventQueue, nullptr); EXPECT_NE(mManager, nullptr); ObserverNetworkOption options; options.mEnableProtocols = {"HTTP"}; mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); struct conn_ctrl_event_t connectEvent = {}; connectEvent.conn_id.fd = 1; connectEvent.conn_id.tgid = 1000; connectEvent.conn_id.start = 123456; connectEvent.type = EventConnect; mManager->AcceptNetCtrlEvent(&connectEvent); struct conn_stats_event_t statsEvent = {}; statsEvent.conn_id = connectEvent.conn_id; statsEvent.protocol = support_proto_e::ProtoHTTP; statsEvent.role = support_role_e::IsClient; statsEvent.si.family = AF_INET; statsEvent.si.netns = 12345; statsEvent.si.ap.saddr = 0x0100007F; // 127.0.0.1 statsEvent.si.ap.daddr = 0x0101A8C0; // 192.168.1.1 statsEvent.si.ap.sport = htons(8080); statsEvent.si.ap.dport = htons(80); mManager->AcceptNetStatsEvent(&statsEvent); struct conn_ctrl_event_t closeEvent = connectEvent; closeEvent.type = EventClose; mManager->AcceptNetCtrlEvent(&closeEvent); mManager->RecordEventLost(callback_type_e::CTRL_HAND, 1); mManager->RecordEventLost(callback_type_e::INFO_HANDLE, 2); mManager->RecordEventLost(callback_type_e::STAT_HAND, 3); } std::shared_ptr<Connection> CreateTestTracker() { ConnId connId(1, 1000, 123456); return std::make_shared<Connection>(connId); } conn_data_event_t* CreateHttpDataEvent() { const std::string resp = "HTTP/1.1 200 OK\r\n" "Content-Type: text/html\r\n" "Content-Length: 13\r\n" "\r\n" "Hello, World!"; const std::string req = "GET /index.html HTTP/1.1\r\nHost: www.cmonitor.ai\r\nAccept: image/gif, image/jpeg, " "*/*\r\nUser-Agent: Mozilla/5.0 (X11; Linux x86_64)\r\n\r\n"; std::string msg = req + resp; conn_data_event_t* evt = (conn_data_event_t*)malloc(offsetof(conn_data_event_t, msg) + msg.size()); memcpy(evt->msg, msg.data(), msg.size()); evt->conn_id.fd = 0; evt->conn_id.start = 1; evt->conn_id.tgid = 2; evt->role = support_role_e::IsClient; evt->request_len = req.size(); evt->response_len = resp.size(); evt->protocol = support_proto_e::ProtoHTTP; evt->start_ts = 1; evt->end_ts = 2; return evt; } conn_data_event_t* CreateHttpDataEvent(int i) { const std::string resp = "HTTP/1.1 200 OK\r\n" "Content-Type: text/html\r\n" "Content-Length: 13\r\n" "\r\n" "Hello, World!"; const std::string req = "GET /index.html/" + std::to_string(i) + " HTTP/1.1\r\nHost: www.cmonitor.ai\r\nAccept: image/gif, image/jpeg, " "*/*\r\nUser-Agent: Mozilla/5.0 (X11; Linux x86_64)\r\n\r\n"; std::string msg = req + resp; conn_data_event_t* evt = (conn_data_event_t*)malloc(offsetof(conn_data_event_t, msg) + msg.size()); memcpy(evt->msg, msg.data(), msg.size()); evt->conn_id.fd = 0; evt->conn_id.start = 1; evt->conn_id.tgid = 2; evt->role = support_role_e::IsClient; evt->request_len = req.size(); evt->response_len = resp.size(); evt->protocol = support_proto_e::ProtoHTTP; evt->start_ts = 1; evt->end_ts = 2; return evt; } conn_stats_event_t CreateConnStatsEvent() { struct conn_stats_event_t statsEvent = {}; statsEvent.protocol = support_proto_e::ProtoHTTP; statsEvent.role = support_role_e::IsClient; statsEvent.si.family = AF_INET; statsEvent.si.ap.saddr = 0x0100007F; // 127.0.0.1 statsEvent.si.ap.daddr = 0x0101A8C0; // 192.168.1.1 statsEvent.si.ap.sport = htons(8080); statsEvent.si.ap.dport = htons(80); statsEvent.ts = 1; // set docker id statsEvent.wr_bytes = 1; statsEvent.conn_id.fd = 0; statsEvent.conn_id.start = 1; statsEvent.conn_id.tgid = 2; // docker id std::string testCid = "/machine.slice/libpod-80b2ea13472c0d75a71af598ae2c01909bb5880151951bf194a3b24a44613106.scope"; memcpy(statsEvent.docker_id, testCid.c_str(), testCid.size()); return statsEvent; } void NetworkObserverManagerUnittest::TestDataEventProcessing() { // auto mManager = CreateManager(); ObserverNetworkOption options; options.mEnableProtocols = {"HTTP"}; mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); mManager->Destroy(); auto statsEvent = CreateConnStatsEvent(); mManager->AcceptNetStatsEvent(&statsEvent); auto* dataEvent = CreateHttpDataEvent(); // TODO @qianlu.kk mManager->AcceptDataEvent(dataEvent); free(dataEvent); std::vector<std::shared_ptr<AbstractRecord>> items(10, nullptr); size_t count = mManager->mRollbackQueue.wait_dequeue_bulk_timed(items.data(), 1024, std::chrono::milliseconds(200)); APSARA_TEST_EQUAL(count, 1UL); APSARA_TEST_TRUE(items[0] != nullptr); AbstractAppRecord* record = static_cast<AbstractAppRecord*>(items[0].get()); APSARA_TEST_TRUE(record != nullptr); auto conn = record->GetConnection(); APSARA_TEST_TRUE(conn != nullptr); APSARA_TEST_TRUE(mManager->mConnectionManager->getConnection(conn->GetConnId()) != nullptr); // destroy connection conn->MarkClose(); for (size_t i = 0; i < 12; i++) { mManager->mConnectionManager->Iterations(); } // connection that record holds still available APSARA_TEST_TRUE(mManager->mConnectionManager->getConnection(conn->GetConnId()) == nullptr); // verify attributes HttpRecord* httpRecord = static_cast<HttpRecord*>(record); // http attrs APSARA_TEST_EQUAL(httpRecord->GetPath(), "/index.html"); APSARA_TEST_EQUAL(httpRecord->GetSpanName(), "/index.html"); APSARA_TEST_EQUAL(httpRecord->GetStatusCode(), 200); APSARA_TEST_EQUAL(httpRecord->GetStartTimeStamp(), 1UL); APSARA_TEST_EQUAL(httpRecord->GetEndTimeStamp(), 2UL); auto& attrs = httpRecord->GetConnection()->GetConnTrackerAttrs(); APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kLocalAddr.Name())], "127.0.0.1:8080"); APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kRemoteAddr.Name())], "192.168.1.1:80"); APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kRpcType.Name())], "25"); APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kCallKind.Name())], "http_client"); APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kCallType.Name())], "http_client"); } void NetworkObserverManagerUnittest::TestWhitelistManagement() { // auto mManager = CreateManager(); ObserverNetworkOption options; options.mEnableProtocols = {"HTTP"}; mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); std::vector<std::string> enableCids = {"container1", "container2"}; std::vector<std::string> disableCids; mManager->UpdateWhitelists(std::move(enableCids), std::move(disableCids)); enableCids.clear(); disableCids = {"container3", "container4"}; mManager->UpdateWhitelists(std::move(enableCids), std::move(disableCids)); enableCids = {"container5"}; disableCids = {"container6"}; mManager->UpdateWhitelists(std::move(enableCids), std::move(disableCids)); } void NetworkObserverManagerUnittest::TestPerfBufferOperations() { // auto mManager = CreateManager(); ObserverNetworkOption options; options.mEnableProtocols = {"HTTP"}; mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); int result = mManager->PollPerfBuffer(); EXPECT_EQ(result, 0); for (int i = 0; i < 5; i++) { result = mManager->PollPerfBuffer(); EXPECT_EQ(result, 0); } } void NetworkObserverManagerUnittest::TestRecordProcessing() { // auto mManager = CreateManager(); ObserverNetworkOption options; options.mEnableProtocols = {"HTTP"}; options.mEnableLog = true; options.mEnableMetric = true; options.mEnableSpan = true; options.mSampleRate = 1; mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); auto podInfo = std::make_shared<K8sPodInfo>(); podInfo->mContainerIds = {"1", "2"}; podInfo->mAppName = "test-app-name"; podInfo->mAppId = "test-app-id"; podInfo->mPodIp = "test-pod-ip"; podInfo->mPodName = "test-pod-name"; podInfo->mNamespace = "test-namespace"; podInfo->mWorkloadKind = "Deployment"; podInfo->mWorkloadName = "test-workloadname"; LOG_INFO(sLogger, ("step", "0-0")); K8sMetadata::GetInstance().mContainerCache.insert( "80b2ea13472c0d75a71af598ae2c01909bb5880151951bf194a3b24a44613106", podInfo); auto peerPodInfo = std::make_shared<K8sPodInfo>(); peerPodInfo->mContainerIds = {"3", "4"}; peerPodInfo->mPodIp = "peer-pod-ip"; peerPodInfo->mPodName = "peer-pod-name"; peerPodInfo->mNamespace = "peer-namespace"; K8sMetadata::GetInstance().mIpCache.insert("192.168.1.1", peerPodInfo); auto statsEvent = CreateConnStatsEvent(); mManager->AcceptNetStatsEvent(&statsEvent); auto cnn = mManager->mConnectionManager->getConnection({0, 2, 1}); APSARA_TEST_TRUE(cnn != nullptr); APSARA_TEST_TRUE(cnn->IsL7MetaAttachReady()); APSARA_TEST_TRUE(cnn->IsPeerMetaAttachReady()); APSARA_TEST_TRUE(cnn->IsSelfMetaAttachReady()); APSARA_TEST_TRUE(cnn->IsL4MetaAttachReady()); APSARA_TEST_TRUE(cnn->IsMetaAttachReadyForAppRecord()); // Generate 10 records for (size_t i = 0; i < 100; i++) { auto* dataEvent = CreateHttpDataEvent(i); mManager->AcceptDataEvent(dataEvent); free(dataEvent); } std::this_thread::sleep_for(std::chrono::milliseconds(400)); // verify auto now = std::chrono::steady_clock::now(); LOG_INFO(sLogger, ("====== consume span ======", "")); APSARA_TEST_TRUE(mManager->ConsumeSpanAggregateTree(now)); APSARA_TEST_EQUAL(mManager->mSpanEventGroups.size(), 1UL); APSARA_TEST_EQUAL(mManager->mSpanEventGroups[0].GetEvents().size(), 100UL); auto tags = mManager->mSpanEventGroups[0].GetTags(); for (const auto& tag : tags) { LOG_INFO(sLogger, ("dump span tags", "")(std::string(tag.first), std::string(tag.second))); } APSARA_TEST_EQUAL(tags.size(), 6UL); APSARA_TEST_EQUAL(tags["service.name"], "test-app-name"); APSARA_TEST_EQUAL(tags["arms.appId"], "test-app-id"); APSARA_TEST_EQUAL(tags["host.ip"], "127.0.0.1"); APSARA_TEST_EQUAL(tags["host.name"], "127.0.0.1"); APSARA_TEST_EQUAL(tags["arms.app.type"], "ebpf"); APSARA_TEST_EQUAL(tags["data_type"], "trace"); // used for route LOG_INFO(sLogger, ("====== consume metric ======", "")); APSARA_TEST_TRUE(mManager->ConsumeMetricAggregateTree(now)); APSARA_TEST_EQUAL(mManager->mMetricEventGroups.size(), 1UL); APSARA_TEST_EQUAL(mManager->mMetricEventGroups[0].GetEvents().size(), 301UL); tags = mManager->mMetricEventGroups[0].GetTags(); for (const auto& tag : tags) { LOG_INFO(sLogger, ("dump metric tags", "")(std::string(tag.first), std::string(tag.second))); } APSARA_TEST_EQUAL(tags.size(), 6UL); APSARA_TEST_EQUAL(tags["service"], "test-app-name"); APSARA_TEST_EQUAL(tags["pid"], "test-app-id"); APSARA_TEST_EQUAL(tags["serverIp"], "127.0.0.1"); APSARA_TEST_EQUAL(tags["host"], "127.0.0.1"); APSARA_TEST_EQUAL(tags["source"], "ebpf"); APSARA_TEST_EQUAL(tags["data_type"], "metric"); // used for route LOG_INFO(sLogger, ("====== consume log ======", "")); APSARA_TEST_TRUE(mManager->ConsumeLogAggregateTree(now)); APSARA_TEST_EQUAL(mManager->mLogEventGroups.size(), 1UL); APSARA_TEST_EQUAL(mManager->mLogEventGroups[0].GetEvents().size(), 100UL); tags = mManager->mLogEventGroups[0].GetTags(); APSARA_TEST_EQUAL(tags.size(), 1UL); } // TEST RollBack mechanism void NetworkObserverManagerUnittest::TestRollbackProcessing() { // case1. caused by conn stats event comes later than data event ... { // auto mManager = CreateManager(); ObserverNetworkOption options; options.mEnableProtocols = {"HTTP"}; options.mEnableLog = true; options.mEnableMetric = true; options.mEnableSpan = true; mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); auto podInfo = std::make_shared<K8sPodInfo>(); podInfo->mContainerIds = {"1", "2"}; podInfo->mAppName = "test-app-name"; podInfo->mAppId = "test-app-id"; podInfo->mPodIp = "test-pod-ip"; podInfo->mPodName = "test-pod-name"; podInfo->mNamespace = "test-namespace"; podInfo->mWorkloadKind = "Deployment"; podInfo->mWorkloadName = "test-workloadname"; LOG_INFO(sLogger, ("step", "0-0")); K8sMetadata::GetInstance().mContainerCache.insert( "80b2ea13472c0d75a71af598ae2c01909bb5880151951bf194a3b24a44613106", podInfo); auto peerPodInfo = std::make_shared<K8sPodInfo>(); peerPodInfo->mContainerIds = {"3", "4"}; peerPodInfo->mPodIp = "peer-pod-ip"; peerPodInfo->mPodName = "peer-pod-name"; peerPodInfo->mNamespace = "peer-namespace"; K8sMetadata::GetInstance().mIpCache.insert("192.168.1.1", peerPodInfo); // Generate 10 records for (size_t i = 0; i < 100; i++) { auto* dataEvent = CreateHttpDataEvent(i); mManager->AcceptDataEvent(dataEvent); free(dataEvent); } auto cnn = mManager->mConnectionManager->getConnection({0, 2, 1}); APSARA_TEST_FALSE(cnn->IsMetaAttachReadyForAppRecord()); std::this_thread::sleep_for(std::chrono::milliseconds(500)); // conn stats arrive auto statsEvent = CreateConnStatsEvent(); mManager->AcceptNetStatsEvent(&statsEvent); APSARA_TEST_TRUE(cnn != nullptr); APSARA_TEST_TRUE(cnn->IsL7MetaAttachReady()); APSARA_TEST_TRUE(cnn->IsPeerMetaAttachReady()); APSARA_TEST_TRUE(cnn->IsSelfMetaAttachReady()); APSARA_TEST_TRUE(cnn->IsL4MetaAttachReady()); APSARA_TEST_TRUE(cnn->IsMetaAttachReadyForAppRecord()); APSARA_TEST_EQUAL(mManager->mDropRecordTotal, 0); APSARA_TEST_EQUAL(mManager->mRollbackRecordTotal, 100); std::this_thread::sleep_for(std::chrono::seconds(5)); APSARA_TEST_EQUAL(mManager->mDropRecordTotal, 0); APSARA_TEST_EQUAL(mManager->mRollbackRecordTotal, 100); // Generate 10 records for (size_t i = 0; i < 100; i++) { auto* dataEvent = CreateHttpDataEvent(i); mManager->AcceptDataEvent(dataEvent); free(dataEvent); } std::this_thread::sleep_for(std::chrono::milliseconds(500)); APSARA_TEST_EQUAL(mManager->mDropRecordTotal, 0); APSARA_TEST_EQUAL(mManager->mRollbackRecordTotal, 100); } // case2. caused by fetch metadata from server ... { // mock data event // mock conn stats event // mock iterations // mock async fetch metadata // verify } // case3. caused by no conn stats received ... // conn stats data may loss {} } void NetworkObserverManagerUnittest::TestConfigUpdate() { // for protocol update { // auto mManager = CreateManager(); ObserverNetworkOption options; options.mEnableProtocols = {"http"}; std::cout << magic_enum::enum_name(support_proto_e::ProtoHTTP) << std::endl; mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); APSARA_TEST_TRUE(mManager->mPreviousOpt != nullptr); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableProtocols.size(), 1UL); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableProtocols[0], "http"); // only http APSARA_TEST_EQUAL(ProtocolParserManager::GetInstance().mParsers.size(), 1UL); APSARA_TEST_TRUE(ProtocolParserManager::GetInstance().mParsers.count(support_proto_e::ProtoHTTP) > 0); APSARA_TEST_TRUE(ProtocolParserManager::GetInstance().mParsers[support_proto_e::ProtoHTTP] != nullptr); options.mEnableProtocols = {"MySQL", "Redis", "Dubbo"}; // std::vector<std::string> protocols = {"MySQL", "Redis", "Dubbo"}; int result = mManager->Update(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); APSARA_TEST_EQUAL(result, 0); APSARA_TEST_EQUAL(ProtocolParserManager::GetInstance().mParsers.size(), 0UL); // protocols = {"HTTP", "MySQL"}; options.mEnableProtocols = {"HTTP", "MySQL"}; result = mManager->Update(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); APSARA_TEST_EQUAL(result, 0); APSARA_TEST_EQUAL(ProtocolParserManager::GetInstance().mParsers.size(), 1UL); APSARA_TEST_TRUE(ProtocolParserManager::GetInstance().mParsers.count(support_proto_e::ProtoHTTP) > 0); APSARA_TEST_TRUE(ProtocolParserManager::GetInstance().mParsers[support_proto_e::ProtoHTTP] != nullptr); // protocols.clear(); options.mEnableProtocols = {}; result = mManager->Update(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); APSARA_TEST_EQUAL(result, 0); APSARA_TEST_EQUAL(ProtocolParserManager::GetInstance().mParsers.size(), 0UL); mManager->Destroy(); } // for enable log // for protocol update { ObserverNetworkOption options; options.mEnableProtocols = {"http"}; options.mEnableLog = false; options.mEnableMetric = true; options.mEnableSpan = true; mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); APSARA_TEST_TRUE(mManager->mPreviousOpt != nullptr); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableProtocols.size(), 1UL); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableProtocols[0], "http"); // only http APSARA_TEST_EQUAL(ProtocolParserManager::GetInstance().mParsers.size(), 1UL); APSARA_TEST_TRUE(ProtocolParserManager::GetInstance().mParsers.count(support_proto_e::ProtoHTTP) > 0); APSARA_TEST_TRUE(ProtocolParserManager::GetInstance().mParsers[support_proto_e::ProtoHTTP] != nullptr); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableLog, false); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableMetric, true); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableSpan, true); options.mEnableProtocols = {"MySQL", "Redis", "Dubbo"}; options.mEnableLog = true; options.mEnableMetric = false; options.mEnableSpan = false; // std::vector<std::string> protocols = {"MySQL", "Redis", "Dubbo"}; int result = mManager->Update(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); APSARA_TEST_EQUAL(result, 0); APSARA_TEST_EQUAL(ProtocolParserManager::GetInstance().mParsers.size(), 0UL); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableLog, true); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableMetric, false); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableSpan, false); // protocols = {"HTTP", "MySQL"}; options.mEnableProtocols = {"HTTP", "MySQL"}; options.mEnableLog = true; options.mEnableMetric = true; options.mEnableSpan = false; result = mManager->Update(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); APSARA_TEST_EQUAL(result, 0); APSARA_TEST_EQUAL(ProtocolParserManager::GetInstance().mParsers.size(), 1UL); APSARA_TEST_TRUE(ProtocolParserManager::GetInstance().mParsers.count(support_proto_e::ProtoHTTP) > 0); APSARA_TEST_TRUE(ProtocolParserManager::GetInstance().mParsers[support_proto_e::ProtoHTTP] != nullptr); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableLog, true); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableMetric, true); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableSpan, false); // protocols.clear(); options.mEnableProtocols = {}; result = mManager->Update(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); APSARA_TEST_EQUAL(result, 0); APSARA_TEST_EQUAL(ProtocolParserManager::GetInstance().mParsers.size(), 0UL); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableLog, true); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableMetric, true); APSARA_TEST_EQUAL(mManager->mPreviousOpt->mEnableSpan, false); } } void NetworkObserverManagerUnittest::TestPluginLifecycle() { // auto mManager = CreateManager(); ObserverNetworkOption options; options.mEnableProtocols = {"HTTP"}; int result = mManager->Init(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); EXPECT_EQ(result, 0); // case1: udpate // suspend // update // destroy // case2: init and stop // case3: stop and re-run options.mEnableProtocols = {"HTTP", "MySQL"}; result = mManager->Update(std::variant<SecurityOptions*, ObserverNetworkOption*>(&options)); EXPECT_EQ(result, 0); result = mManager->Destroy(); EXPECT_EQ(result, 0); } std::shared_ptr<K8sPodInfo> CreatePodInfo(const std::string& cid) { auto podInfo = std::make_shared<K8sPodInfo>(); podInfo->mContainerIds = {cid}; podInfo->mPodIp = "test-pod-ip"; podInfo->mPodName = "test-pod-name"; podInfo->mNamespace = "test-namespace"; podInfo->mAppId = cid + "-test-app-id"; podInfo->mAppName = cid + "-test-app-name"; return podInfo; } void NetworkObserverManagerUnittest::TestHandleHostMetadataUpdate() { std::vector<std::string> cidLists0 = {"1", "2", "3", "4", "5"}; for (auto cid : cidLists0) { K8sMetadata::GetInstance().mContainerCache.insert(cid, CreatePodInfo(cid)); } mManager->HandleHostMetadataUpdate({"1", "2", "3", "4"}); APSARA_TEST_EQUAL(mManager->mEnableCids.size(), 4); APSARA_TEST_EQUAL(mManager->mDisableCids.size(), 0); mManager->HandleHostMetadataUpdate({"2", "3", "4", "5"}); APSARA_TEST_EQUAL(mManager->mEnableCids.size(), 1); // only add "5" APSARA_TEST_EQUAL(mManager->mDisableCids.size(), 1); // delete "1" mManager->HandleHostMetadataUpdate({"4", "5", "6"}); APSARA_TEST_EQUAL(mManager->mEnableCids.size(), 0); APSARA_TEST_EQUAL(mManager->mDisableCids.size(), 2); // delete "2" "3" } void NetworkObserverManagerUnittest::TestPeriodicalTask() { // manager init, will execute mManager->mFlag = true; Timer::GetInstance()->Clear(); EBPFServer::GetInstance()->UpdatePluginManager(PluginType::NETWORK_OBSERVE, mManager); auto now = std::chrono::steady_clock::now(); std::shared_ptr<ScheduleConfig> metricConfig = std::make_shared<NetworkObserverScheduleConfig>(std::chrono::seconds(15), JobType::METRIC_AGG); std::shared_ptr<ScheduleConfig> spanConfig = std::make_shared<NetworkObserverScheduleConfig>(std::chrono::seconds(2), JobType::SPAN_AGG); std::shared_ptr<ScheduleConfig> logConfig = std::make_shared<NetworkObserverScheduleConfig>(std::chrono::seconds(2), JobType::LOG_AGG); mManager->ScheduleNext(now, metricConfig); mManager->ScheduleNext(now, spanConfig); mManager->ScheduleNext(now, logConfig); APSARA_TEST_EQUAL(mManager->mExecTimes, 4); std::this_thread::sleep_for(std::chrono::seconds(3)); APSARA_TEST_EQUAL(mManager->mExecTimes, 6); std::this_thread::sleep_for(std::chrono::seconds(2)); APSARA_TEST_EQUAL(mManager->mExecTimes, 8); std::this_thread::sleep_for(std::chrono::seconds(2)); APSARA_TEST_EQUAL(mManager->mExecTimes, 10); std::this_thread::sleep_for(std::chrono::seconds(2)); APSARA_TEST_EQUAL(mManager->mExecTimes, 12); std::this_thread::sleep_for(std::chrono::seconds(2)); APSARA_TEST_EQUAL(mManager->mExecTimes, 14); std::this_thread::sleep_for(std::chrono::seconds(2)); APSARA_TEST_EQUAL(mManager->mExecTimes, 16); std::this_thread::sleep_for(std::chrono::seconds(3)); // execute 2 metric task APSARA_TEST_EQUAL(mManager->mExecTimes, 20); } void NetworkObserverManagerUnittest::BenchmarkConsumeTask() { } UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestInitialization); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestEventHandling); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestDataEventProcessing); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestWhitelistManagement); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestPerfBufferOperations); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestRecordProcessing); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestRollbackProcessing); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestConfigUpdate); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestPluginLifecycle); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestHandleHostMetadataUpdate); UNIT_TEST_CASE(NetworkObserverManagerUnittest, TestPeriodicalTask); UNIT_TEST_CASE(NetworkObserverManagerUnittest, BenchmarkConsumeTask); } // namespace ebpf } // namespace logtail UNIT_TEST_MAIN