core/unittest/ebpf/ConnectionUnittest.cpp (179 lines of code) (raw):

// Copyright 2025 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include <gtest/gtest.h> #include <chrono> #include <memory> #include <thread> #include "ebpf/plugin/network_observer/Connection.h" #include "ebpf/plugin/network_observer/ConnectionManager.h" #include "metadata/K8sMetadata.h" #include "unittest/Unittest.h" namespace logtail { namespace ebpf { class ConnectionUnittest : public ::testing::Test { public: void TestBasicOperations(); void TestStateTransitions(); void TestProtocolHandling(); void TestMetadataManagement(); protected: void SetUp() override {} void TearDown() override {} private: std::shared_ptr<Connection> CreateTestTracker() { ConnId connId(1, 1000, 123456); return std::make_shared<Connection>(connId); } void ValidateTrackerState(const std::shared_ptr<Connection>& tracker, bool expectedClose, support_role_e expectedRole, int expectedEpoch) { APSARA_TEST_EQUAL(tracker->IsClose(), expectedClose); support_role_e role = tracker->GetRole(); APSARA_TEST_EQUAL(role, expectedRole); APSARA_TEST_EQUAL(tracker->GetEpoch(), expectedEpoch); } }; void ConnectionUnittest::TestBasicOperations() { auto tracker = CreateTestTracker(); ValidateTrackerState(tracker, false, support_role_e::IsUnknown, 4); ConnId expectedId(1, 1000, 123456); APSARA_TEST_EQUAL(tracker->GetConnId(), expectedId); tracker->RecordActive(); auto now = std::chrono::steady_clock::now(); APSARA_TEST_FALSE(tracker->ReadyToDestroy(now)); tracker->CountDown(); APSARA_TEST_EQUAL(tracker->GetEpoch(), 3); } void ConnectionUnittest::TestStateTransitions() { auto tracker = CreateTestTracker(); ValidateTrackerState(tracker, false, support_role_e::IsUnknown, 4); struct conn_stats_event_t statsEvent = {}; statsEvent.protocol = support_proto_e::ProtoHTTP; statsEvent.role = support_role_e::IsClient; statsEvent.si.family = AF_INET; statsEvent.si.ap.saddr = 0x0100007F; // 127.0.0.1 statsEvent.si.ap.daddr = 0x0101A8C0; // 192.168.1.1 statsEvent.si.ap.sport = htons(8080); statsEvent.si.ap.dport = htons(80); statsEvent.ts = 1; // set docker id statsEvent.wr_bytes = 1; tracker->UpdateConnStats(&statsEvent); tracker->RecordActive(); auto futureTime = std::chrono::steady_clock::now() + std::chrono::seconds(121); APSARA_TEST_TRUE(tracker->ReadyToDestroy(futureTime)); tracker->MarkClose(); tracker->RecordActive(); futureTime = std::chrono::steady_clock::now() + std::chrono::seconds(10); APSARA_TEST_FALSE(tracker->ReadyToDestroy(futureTime)); for (size_t i = 0; i < 12; i++) { tracker->CountDown(); } APSARA_TEST_TRUE(tracker->ReadyToDestroy(futureTime)); } void ConnectionUnittest::TestProtocolHandling() { auto tracker = CreateTestTracker(); struct conn_stats_event_t statsEvent = {}; statsEvent.protocol = support_proto_e::ProtoHTTP; statsEvent.role = support_role_e::IsUnknown; statsEvent.si.family = AF_INET; statsEvent.si.ap.saddr = 0x0100007F; // 127.0.0.1 statsEvent.si.ap.daddr = 0x0101A8C0; // 192.168.1.1 statsEvent.si.ap.sport = htons(8080); statsEvent.si.ap.dport = htons(80); statsEvent.ts = 1; // set docker id statsEvent.wr_bytes = 1; // will update protocol but will not update role tracker->UpdateConnStats(&statsEvent); APSARA_TEST_FALSE(tracker->IsL7MetaAttachReady()); support_proto_e pt = tracker->GetProtocol(); const StaticDataRow<&kConnTrackerTable>& attrs = tracker->GetConnTrackerAttrs(); APSARA_TEST_EQUAL(pt, support_proto_e::ProtoHTTP); APSARA_TEST_EQUAL(tracker->GetSourceIp(), "127.0.0.1"); APSARA_TEST_EQUAL(tracker->GetRemoteIp(), "192.168.1.1"); // role not set, so we cannot fill rpc attr APSARA_TEST_EQUAL(attrs.Get<kRpcType>(), ""); APSARA_TEST_EQUAL(attrs.Get<kCallKind>(), ""); APSARA_TEST_EQUAL(attrs.Get<kCallType>(), ""); LOG_DEBUG(sLogger, ("connection", tracker->DumpConnection())); // mock receive a data event // tracker->UpdateRole(support_role_e::IsClient); // tracker->UpdateProtocol(support_proto_e::ProtoHTTP); tracker->TryAttachL7Meta(support_role_e::IsClient, support_proto_e::ProtoHTTP); APSARA_TEST_TRUE(tracker->IsL7MetaAttachReady()); APSARA_TEST_EQUAL(attrs.Get<kRpcType>(), "25"); APSARA_TEST_EQUAL(attrs.Get<kCallKind>(), "http_client"); APSARA_TEST_EQUAL(attrs.Get<kCallType>(), "http_client"); // now rpc attributes all set APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kRpcType.Name())], "25"); APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kCallKind.Name())], "http_client"); APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kCallType.Name())], "http_client"); LOG_DEBUG(sLogger, ("connection", tracker->DumpConnection())); // role chage ... // tracker->UpdateRole(support_role_e::IsServer); // APSARA_TEST_EQUAL(attrs.Get<kRpcType>(), "25"); // APSARA_TEST_EQUAL(attrs.Get<kCallKind>(), "http_client"); // APSARA_TEST_EQUAL(attrs.Get<kCallType>(), "http_client"); // APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kRpcType.Name())], "25"); // APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kCallKind.Name())], "http_client"); // APSARA_TEST_EQUAL(attrs[kConnTrackerTable.ColIndex(kCallType.Name())], "http_client"); // LOG_DEBUG(sLogger, ("connection", tracker->DumpConnection())); // protocol change ... // tracker->UpdateProtocol(support_proto_e::ProtoMySQL); // APSARA_TEST_EQUAL(std::string(attrs.Get<kRpcType>()), "25"); // APSARA_TEST_EQUAL(std::string(attrs.Get<kCallKind>()), "http_client"); // APSARA_TEST_EQUAL(std::string(attrs.Get<kCallType>()), "http_client"); // APSARA_TEST_EQUAL(std::string(attrs[kConnTrackerTable.ColIndex(kRpcType.Name())]), "25"); // APSARA_TEST_EQUAL(std::string(attrs[kConnTrackerTable.ColIndex(kCallKind.Name())]), "http_client"); // APSARA_TEST_EQUAL(std::string(attrs[kConnTrackerTable.ColIndex(kCallType.Name())]), "http_client"); // APSARA_TEST_EQUAL(tracker->GetProtocol(), support_proto_e::ProtoHTTP); // LOG_DEBUG(sLogger, ("connection", tracker->DumpConnection())); } void ConnectionUnittest::TestMetadataManagement() { auto tracker = CreateTestTracker(); struct conn_stats_event_t statsEvent = {}; statsEvent.protocol = support_proto_e::ProtoHTTP; statsEvent.role = support_role_e::IsUnknown; statsEvent.si.family = AF_INET; statsEvent.si.ap.saddr = 0x0100007F; // 127.0.0.1 statsEvent.si.ap.daddr = 0x0101A8C0; // 192.168.1.1 statsEvent.si.ap.sport = htons(8080); statsEvent.si.ap.dport = htons(80); statsEvent.ts = 1; // docker id std::string testCid = "/machine.slice/libpod-80b2ea13472c0d75a71af598ae2c01909bb5880151951bf194a3b24a44613106.scope"; memcpy(statsEvent.docker_id, testCid.c_str(), testCid.size()); LOG_DEBUG(sLogger, ("flags", tracker->GetMetaFlags())); // attach net metadata tracker->UpdateConnStats(&statsEvent); APSARA_TEST_EQUAL(tracker->GetContainerId(), "80b2ea13472c0d75a71af598ae2c01909bb5880151951bf194a3b24a44613106"); APSARA_TEST_FALSE(tracker->IsPeerMetaAttachReady()); APSARA_TEST_TRUE(tracker->IsL4MetaAttachReady()); APSARA_TEST_FALSE(tracker->IsL7MetaAttachReady()); APSARA_TEST_FALSE(tracker->IsSelfMetaAttachReady()); LOG_DEBUG(sLogger, ("flags", tracker->GetMetaFlags())); APSARA_TEST_FALSE(tracker->IsMetaAttachReadyForAppRecord()); APSARA_TEST_EQUAL(tracker->GetRemoteIp(), "192.168.1.1"); LOG_INFO(sLogger, ("step", "0")); // add k8s metadata cache // attach self pod metadata auto podInfo = std::make_shared<K8sPodInfo>(); podInfo->mContainerIds = {"1", "2"}; podInfo->mPodIp = "test-pod-ip"; podInfo->mPodName = "test-pod-name"; podInfo->mNamespace = "test-namespace"; LOG_INFO(sLogger, ("step", "0-0")); K8sMetadata::GetInstance().mContainerCache.insert(std::string(tracker->GetContainerId()), podInfo); LOG_INFO(sLogger, ("step", "0-1")); tracker->TryAttachSelfMeta(); LOG_INFO(sLogger, ("step", "0-2")); tracker->TryAttachPeerMeta(); LOG_INFO(sLogger, ("step", "0-3")); APSARA_TEST_TRUE(tracker->IsSelfMetaAttachReady()); APSARA_TEST_FALSE(tracker->IsPeerMetaAttachReady()); APSARA_TEST_TRUE(tracker->IsL4MetaAttachReady()); APSARA_TEST_FALSE(tracker->IsL7MetaAttachReady()); LOG_INFO(sLogger, ("step", "1")); // attach peer pod metadata auto peerPodInfo = std::make_shared<K8sPodInfo>(); peerPodInfo->mContainerIds = {"3", "4"}; peerPodInfo->mPodIp = "peer-pod-ip"; peerPodInfo->mPodName = "peer-pod-name"; peerPodInfo->mNamespace = "peer-namespace"; K8sMetadata::GetInstance().mIpCache.insert(std::string(tracker->GetRemoteIp()), peerPodInfo); LOG_INFO(sLogger, ("step", "2")); tracker->TryAttachSelfMeta(); tracker->TryAttachPeerMeta(); K8sMetadata::GetInstance().mIpCache.remove(std::string(tracker->GetRemoteIp())); K8sMetadata::GetInstance().mContainerCache.remove(std::string(tracker->GetContainerId())); tracker->IsL4MetaAttachReady(); APSARA_TEST_TRUE(tracker->IsSelfMetaAttachReady()); APSARA_TEST_TRUE(tracker->IsPeerMetaAttachReady()); APSARA_TEST_TRUE(tracker->IsL4MetaAttachReady()); APSARA_TEST_FALSE(tracker->IsL7MetaAttachReady()); LOG_INFO(sLogger, ("step", "3")); // mock receive data event ... // tracker->UpdateRole(support_role_e::IsClient); // tracker->UpdateProtocol(support_proto_e::ProtoHTTP); tracker->TryAttachL7Meta(support_role_e::IsClient, support_proto_e::ProtoHTTP); tracker->RecordActive(); APSARA_TEST_TRUE(tracker->IsSelfMetaAttachReady()); APSARA_TEST_TRUE(tracker->IsPeerMetaAttachReady()); APSARA_TEST_TRUE(tracker->IsL4MetaAttachReady()); APSARA_TEST_TRUE(tracker->IsL7MetaAttachReady()); LOG_INFO(sLogger, ("step", "4")); APSARA_TEST_TRUE(tracker->IsMetaAttachReadyForAppRecord()); } UNIT_TEST_CASE(ConnectionUnittest, TestBasicOperations); UNIT_TEST_CASE(ConnectionUnittest, TestProtocolHandling); UNIT_TEST_CASE(ConnectionUnittest, TestMetadataManagement); UNIT_TEST_CASE(ConnectionUnittest, TestStateTransitions); } // namespace ebpf } // namespace logtail UNIT_TEST_MAIN