core/unittest/batch/BatchItemUnittest.cpp (249 lines of code) (raw):

// Copyright 2024 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "collection_pipeline/batch/BatchItem.h" #include "unittest/Unittest.h" using namespace std; namespace logtail { class EventBatchItemUnittest : public ::testing::Test { public: void TestReset(); void TestAddSourceBuffer(); void TestAdd(); void TestFlushEmpty(); void TestFlushGroupBatchItem(); void TestFlushBatchedEvensList(); void TestFlushBatchedEvensLists(); void TestExactlyOnce(); protected: static void SetUpTestCase() { sSourceBuffer.reset(new SourceBuffer); sEventGroup.reset(new PipelineEventGroup(sSourceBuffer)); sEventGroup->SetTag(string("key"), string("val")); sEventGroup->SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); } void SetUp() override { StringBuffer b = sEventGroup->GetSourceBuffer()->CopyString(string("pack_id")); mItem.Reset(sEventGroup->GetSizedTags(), sEventGroup->GetSourceBuffer(), sEventGroup->GetExactlyOnceCheckpoint(), StringView(b.data, b.size)); } void TearDown() override { vector<PipelineEventPtr> tmp; sEventGroup->SwapEvents(tmp); } private: static shared_ptr<SourceBuffer> sSourceBuffer; static unique_ptr<PipelineEventGroup> sEventGroup; EventBatchItem<EventBatchStatus> mItem; }; shared_ptr<SourceBuffer> EventBatchItemUnittest::sSourceBuffer; unique_ptr<PipelineEventGroup> EventBatchItemUnittest::sEventGroup; void EventBatchItemUnittest::TestReset() { APSARA_TEST_EQUAL(1U, mItem.mBatch.mTags.mInner.size()); APSARA_TEST_NOT_EQUAL(nullptr, mItem.mBatch.mExactlyOnceCheckpoint); APSARA_TEST_STREQ("pack_id", mItem.mBatch.mPackIdPrefix.data()); APSARA_TEST_EQUAL(1U, mItem.mBatch.mSourceBuffers.size()); APSARA_TEST_EQUAL(1U, mItem.mSourceBuffers.size()); } void EventBatchItemUnittest::TestAddSourceBuffer() { shared_ptr<SourceBuffer> buffer1 = sEventGroup->GetSourceBuffer(); shared_ptr<SourceBuffer> buffer2 = make_shared<SourceBuffer>(); mItem.AddSourceBuffer(buffer1); APSARA_TEST_EQUAL(1U, mItem.mBatch.mSourceBuffers.size()); APSARA_TEST_EQUAL(1U, mItem.mSourceBuffers.size()); mItem.AddSourceBuffer(buffer2); APSARA_TEST_EQUAL(2U, mItem.mBatch.mSourceBuffers.size()); APSARA_TEST_EQUAL(2U, mItem.mSourceBuffers.size()); } void EventBatchItemUnittest::TestAdd() { sEventGroup->AddLogEvent(); PipelineEventPtr& e = sEventGroup->MutableEvents().back(); size_t size = e->DataSize(); mItem.Add(std::move(e)); APSARA_TEST_EQUAL(1U, mItem.mBatch.mEvents.size()); APSARA_TEST_EQUAL(1U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(size, mItem.GetStatus().GetSize()); // APSARA_TEST_NOT_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestFlushEmpty() { { GroupBatchItem res; mItem.Flush(res); APSARA_TEST_TRUE(res.mGroups.empty()); } { BatchedEventsList res; mItem.Flush(res); APSARA_TEST_TRUE(res.empty()); } { vector<BatchedEventsList> res; mItem.Flush(res); APSARA_TEST_TRUE(res.empty()); } } void EventBatchItemUnittest::TestFlushGroupBatchItem() { sEventGroup->AddLogEvent(); PipelineEventPtr& e = sEventGroup->MutableEvents().back(); mItem.Add(std::move(e)); auto size = mItem.DataSize(); GroupBatchItem res; mItem.Flush(res); APSARA_TEST_EQUAL(1U, res.mGroups.size()); APSARA_TEST_EQUAL(size, res.mGroups[0].mSizeBytes); APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_TRUE(mItem.mBatch.mTags.mInner.empty()); APSARA_TEST_EQUAL(nullptr, mItem.mBatch.mExactlyOnceCheckpoint); APSARA_TEST_TRUE(mItem.mBatch.mPackIdPrefix.empty()); APSARA_TEST_TRUE(mItem.mBatch.mSourceBuffers.empty()); APSARA_TEST_TRUE(mItem.mSourceBuffers.empty()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); // APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestFlushBatchedEvensList() { sEventGroup->AddLogEvent(); PipelineEventPtr& e = sEventGroup->MutableEvents().back(); mItem.Add(std::move(e)); auto size = mItem.DataSize(); BatchedEventsList res; mItem.Flush(res); APSARA_TEST_EQUAL(1U, res.size()); APSARA_TEST_EQUAL(1U, res[0].mEvents.size()); APSARA_TEST_EQUAL(1U, res[0].mTags.mInner.size()); APSARA_TEST_NOT_EQUAL(nullptr, res[0].mExactlyOnceCheckpoint); APSARA_TEST_STREQ("pack_id", res[0].mPackIdPrefix.data()); APSARA_TEST_EQUAL(1U, res[0].mSourceBuffers.size()); APSARA_TEST_EQUAL(size, res[0].mSizeBytes); APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_TRUE(mItem.mBatch.mTags.mInner.empty()); APSARA_TEST_EQUAL(nullptr, mItem.mBatch.mExactlyOnceCheckpoint); APSARA_TEST_TRUE(mItem.mBatch.mPackIdPrefix.empty()); APSARA_TEST_TRUE(mItem.mBatch.mSourceBuffers.empty()); APSARA_TEST_TRUE(mItem.mSourceBuffers.empty()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); // APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestFlushBatchedEvensLists() { sEventGroup->AddLogEvent(); PipelineEventPtr& e = sEventGroup->MutableEvents().back(); mItem.Add(std::move(e)); auto size = mItem.DataSize(); vector<BatchedEventsList> res; mItem.Flush(res); APSARA_TEST_EQUAL(1U, res.size()); APSARA_TEST_EQUAL(1U, res[0].size()); APSARA_TEST_EQUAL(1U, res[0][0].mEvents.size()); APSARA_TEST_EQUAL(1U, res[0][0].mTags.mInner.size()); APSARA_TEST_NOT_EQUAL(nullptr, res[0][0].mExactlyOnceCheckpoint); APSARA_TEST_STREQ("pack_id", res[0][0].mPackIdPrefix.data()); APSARA_TEST_EQUAL(1U, res[0][0].mSourceBuffers.size()); APSARA_TEST_EQUAL(size, res[0][0].mSizeBytes); APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_TRUE(mItem.mBatch.mTags.mInner.empty()); APSARA_TEST_EQUAL(nullptr, mItem.mBatch.mExactlyOnceCheckpoint); APSARA_TEST_TRUE(mItem.mBatch.mPackIdPrefix.empty()); APSARA_TEST_TRUE(mItem.mBatch.mSourceBuffers.empty()); APSARA_TEST_TRUE(mItem.mSourceBuffers.empty()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); // APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestExactlyOnce() { sEventGroup->AddLogEvent()->SetPosition(1, 5); sEventGroup->AddLogEvent()->SetPosition(6, 10); for (auto& e : sEventGroup->MutableEvents()) { mItem.Add(std::move(e)); } BatchedEventsList res; mItem.Flush(res); APSARA_TEST_EQUAL(1U, res[0].mExactlyOnceCheckpoint->data.read_offset()); APSARA_TEST_EQUAL(15U, res[0].mExactlyOnceCheckpoint->data.read_length()); } UNIT_TEST_CASE(EventBatchItemUnittest, TestReset) UNIT_TEST_CASE(EventBatchItemUnittest, TestAdd) UNIT_TEST_CASE(EventBatchItemUnittest, TestAddSourceBuffer) UNIT_TEST_CASE(EventBatchItemUnittest, TestFlushEmpty) UNIT_TEST_CASE(EventBatchItemUnittest, TestFlushGroupBatchItem) UNIT_TEST_CASE(EventBatchItemUnittest, TestFlushBatchedEvensList) UNIT_TEST_CASE(EventBatchItemUnittest, TestFlushBatchedEvensLists) UNIT_TEST_CASE(EventBatchItemUnittest, TestExactlyOnce) class GroupBatchItemUnittest : public ::testing::Test { public: void TestAdd(); void TestFlushEmpty(); void TestFlushBatchedEvensList(); void TestFlushBatchedEvensLists(); protected: void SetUp() override { shared_ptr<SourceBuffer> sourceBuffer = make_shared<SourceBuffer>(); PipelineEventGroup eventGroup(sourceBuffer); eventGroup.SetTag(string("key"), string("val")); eventGroup.AddLogEvent(); eventGroup.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); mBatch.mEvents = std::move(eventGroup.MutableEvents()); mBatch.mSourceBuffers.emplace_back(std::move(eventGroup.GetSourceBuffer())); mBatch.mTags = std::move(eventGroup.GetSizedTags()); mBatch.mSizeBytes = 100; } void TearDown() override { mBatch.Clear(); mItem.Clear(); } private: BatchedEvents mBatch; GroupBatchItem mItem; }; void GroupBatchItemUnittest::TestAdd() { size_t size = mBatch.mSizeBytes; mItem.Add(std::move(mBatch), 1234567890000); APSARA_TEST_EQUAL(1U, mItem.mGroups.size()); APSARA_TEST_EQUAL(size, mItem.GetStatus().GetSize()); // APSARA_TEST_EQUAL(1234567890000, mItem.TotalEnqueTimeMs()); APSARA_TEST_EQUAL(1U, mItem.EventSize()); APSARA_TEST_EQUAL(1U, mItem.GroupSize()); APSARA_TEST_EQUAL(100U, mItem.DataSize()); } void GroupBatchItemUnittest::TestFlushEmpty() { { BatchedEventsList res; mItem.Flush(res); APSARA_TEST_TRUE(res.empty()); } { vector<BatchedEventsList> res; mItem.Flush(res); APSARA_TEST_TRUE(res.empty()); } } void GroupBatchItemUnittest::TestFlushBatchedEvensList() { mItem.Add(std::move(mBatch), 1234567890000); BatchedEventsList res; mItem.Flush(res); APSARA_TEST_EQUAL(1U, res.size()); APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); APSARA_TEST_EQUAL(0, mItem.TotalEnqueTimeMs()); APSARA_TEST_EQUAL(0U, mItem.EventSize()); APSARA_TEST_EQUAL(0U, mItem.GroupSize()); APSARA_TEST_EQUAL(0U, mItem.DataSize()); } void GroupBatchItemUnittest::TestFlushBatchedEvensLists() { mItem.Add(std::move(mBatch), 1234567890000); vector<BatchedEventsList> res; mItem.Flush(res); APSARA_TEST_EQUAL(1U, res.size()); APSARA_TEST_EQUAL(1U, res[0].size()); APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); APSARA_TEST_EQUAL(0, mItem.TotalEnqueTimeMs()); APSARA_TEST_EQUAL(0U, mItem.EventSize()); APSARA_TEST_EQUAL(0U, mItem.GroupSize()); APSARA_TEST_EQUAL(0U, mItem.DataSize()); } UNIT_TEST_CASE(GroupBatchItemUnittest, TestAdd) UNIT_TEST_CASE(GroupBatchItemUnittest, TestFlushEmpty) UNIT_TEST_CASE(GroupBatchItemUnittest, TestFlushBatchedEvensList) UNIT_TEST_CASE(GroupBatchItemUnittest, TestFlushBatchedEvensLists) } // namespace logtail UNIT_TEST_MAIN