core/unittest/serializer/SLSSerializerUnittest.cpp (534 lines of code) (raw):

// Copyright 2024 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "collection_pipeline/serializer/SLSSerializer.h" #include "plugin/flusher/sls/FlusherSLS.h" #include "unittest/Unittest.h" DECLARE_FLAG_INT32(max_send_log_group_size); using namespace std; namespace logtail { class SLSSerializerUnittest : public ::testing::Test { public: void TestSerializeEventGroup(); void TestSerializeEventGroupList(); protected: static void SetUpTestCase() { sFlusher = make_unique<FlusherSLS>(); } void SetUp() override { mCtx.SetConfigName("test_config"); sFlusher->SetContext(mCtx); sFlusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); } private: BatchedEvents CreateBatchedLogEvents(bool enableNanosecond, bool withEmptyContent = false, bool withNonEmptyContent = true); BatchedEvents CreateBatchedMetricEvents(bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag); BatchedEvents CreateBatchedRawEvents(bool enableNanosecond, bool withEmptyContent = false, bool withNonEmptyContent = true); BatchedEvents CreateBatchedSpanEvents(); static unique_ptr<FlusherSLS> sFlusher; CollectionPipelineContext mCtx; }; unique_ptr<FlusherSLS> SLSSerializerUnittest::sFlusher; void SLSSerializerUnittest::TestSerializeEventGroup() { SLSEventGroupSerializer serializer(sFlusher.get()); { // log { // nano second disabled, and set string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); APSARA_TEST_STREQ("key", logGroup.logs(0).contents(0).key().c_str()); APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); APSARA_TEST_STREQ("source", logGroup.source().c_str()); APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); } { // nano second enabled, and set const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(true), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_EQUAL(1U, logGroup.logs(0).time_ns()); const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false; } { // nano second enabled, not set const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false; } { // with empty event string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(false, true, true), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); APSARA_TEST_STREQ("key", logGroup.logs(0).contents(0).key().c_str()); APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); APSARA_TEST_STREQ("source", logGroup.source().c_str()); APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); } { // only empty event string res, errorMsg; APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedLogEvents(false, true, false), res, errorMsg)); } } // namespace logtail { // metric { // only 1 tag string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, false, true), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4); APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge"); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); APSARA_TEST_STREQ("source", logGroup.source().c_str()); APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); } { // nano second disabled string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, false, false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4); APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1|key2#$#value2"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge"); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); APSARA_TEST_STREQ("source", logGroup.source().c_str()); APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); } { // nano second enabled, less than 9 digits string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(true, 1, false, false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4); APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1|key2#$#value2"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890000000001"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge"); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); APSARA_TEST_STREQ("source", logGroup.source().c_str()); APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); } { // nano second enabled, exactly 9 digits string res, errorMsg; APSARA_TEST_TRUE( serializer.DoSerialize(CreateBatchedMetricEvents(true, 999999999, false, false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4); APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1|key2#$#value2"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890999999999"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge"); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); APSARA_TEST_STREQ("source", logGroup.source().c_str()); APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); } { // timestamp invalid string res, errorMsg; auto batch = CreateBatchedMetricEvents(false, 0, false, false); batch.mEvents[0]->SetTimestamp(123); APSARA_TEST_FALSE(serializer.DoSerialize(std::move(batch), res, errorMsg)); } { // empty metric value string res, errorMsg; APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, true, false), res, errorMsg)); } } { // span string res, errorMsg; auto events = CreateBatchedSpanEvents(); APSARA_TEST_EQUAL(events.mEvents.size(), 1U); APSARA_TEST_TRUE(events.mEvents[0]->GetType() == PipelineEvent::Type::SPAN); APSARA_TEST_TRUE(serializer.DoSerialize(std::move(events), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(13, logGroup.logs(0).contents_size()); // traceid APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "traceId"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "trace-1-2-3-4-5"); // span id APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "spanId"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "span-1-2-3-4-5"); // parent span id APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "parentSpanId"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "parent-1-2-3-4-5"); // spanName APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "spanName"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "/oneagent/qianlu/local/1"); // kind APSARA_TEST_EQUAL(logGroup.logs(0).contents(4).key(), "kind"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(4).value(), "client"); // code APSARA_TEST_EQUAL(logGroup.logs(0).contents(5).key(), "statusCode"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(5).value(), "OK"); // traceState APSARA_TEST_EQUAL(logGroup.logs(0).contents(6).key(), "traceState"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(6).value(), "test-state"); // attributes APSARA_TEST_EQUAL(logGroup.logs(0).contents(7).key(), "attributes"); auto attrs = logGroup.logs(0).contents(7).value(); Json::Value jsonVal; Json::CharReaderBuilder readerBuilder; std::string errs; std::istringstream s(attrs); bool ret = Json::parseFromStream(readerBuilder, s, &jsonVal, &errs); APSARA_TEST_TRUE(ret); APSARA_TEST_EQUAL(jsonVal.size(), 10U); APSARA_TEST_EQUAL(jsonVal["rpcType"].asString(), "25"); APSARA_TEST_EQUAL(jsonVal["scope-tag-0"].asString(), "scope-value-0"); // APSARA_TEST_EQUAL(logGroup.logs(0).contents(7).value(), ""); // links APSARA_TEST_EQUAL(logGroup.logs(0).contents(8).key(), "links"); auto linksStr = logGroup.logs(0).contents(8).value(); std::istringstream ss(linksStr); ret = Json::parseFromStream(readerBuilder, ss, &jsonVal, &errs); APSARA_TEST_TRUE(ret); APSARA_TEST_EQUAL(jsonVal.size(), 1U); for (auto& link : jsonVal) { APSARA_TEST_EQUAL(link["spanId"].asString(), "inner-link-spanid"); APSARA_TEST_EQUAL(link["traceId"].asString(), "inner-link-traceid"); APSARA_TEST_EQUAL(link["traceState"].asString(), "inner-link-trace-state"); } // events APSARA_TEST_EQUAL(logGroup.logs(0).contents(9).key(), "events"); auto eventsStr = logGroup.logs(0).contents(9).value(); std::istringstream sss(eventsStr); ret = Json::parseFromStream(readerBuilder, sss, &jsonVal, &errs); APSARA_TEST_TRUE(ret); APSARA_TEST_EQUAL(jsonVal.size(), 1U); for (auto& event : jsonVal) { APSARA_TEST_EQUAL(event["name"].asString(), "inner-event"); APSARA_TEST_EQUAL(event["timestamp"].asString(), "1000"); } // start APSARA_TEST_EQUAL(logGroup.logs(0).contents(10).key(), "startTime"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(10).value(), "1000"); // end APSARA_TEST_EQUAL(logGroup.logs(0).contents(11).key(), "endTime"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(11).value(), "2000"); // duration APSARA_TEST_EQUAL(logGroup.logs(0).contents(12).key(), "duration"); APSARA_TEST_EQUAL(logGroup.logs(0).contents(12).value(), "1000"); } { // raw { // nano second disabled, and set string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str()); APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); APSARA_TEST_STREQ("source", logGroup.source().c_str()); APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); } { // nano second enabled, and set const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(true), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_EQUAL(1U, logGroup.logs(0).time_ns()); const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false; } { // nano second enabled, not set const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false; } { // with empty event string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false, true, true), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(2, logGroup.logs_size()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str()); APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); APSARA_TEST_EQUAL(1, logGroup.logs(1).contents_size()); APSARA_TEST_STREQ("content", logGroup.logs(1).contents(0).key().c_str()); APSARA_TEST_STREQ("", logGroup.logs(1).contents(0).value().c_str()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(1).time()); APSARA_TEST_FALSE(logGroup.logs(1).has_time_ns()); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); APSARA_TEST_STREQ("source", logGroup.source().c_str()); APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); } { // only empty event string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false, true, false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str()); APSARA_TEST_STREQ("", logGroup.logs(0).contents(0).value().c_str()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); APSARA_TEST_STREQ("source", logGroup.source().c_str()); APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); } } { // log group exceed size limit INT32_FLAG(max_send_log_group_size) = 0; string res, errorMsg; APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedLogEvents(true, false), res, errorMsg)); INT32_FLAG(max_send_log_group_size) = 10 * 1024 * 1024; } { // empty log group PipelineEventGroup group(make_shared<SourceBuffer>()); BatchedEvents batch(std::move(group.MutableEvents()), std::move(group.GetSizedTags()), std::move(group.GetSourceBuffer()), group.GetMetadata(EventGroupMetaKey::SOURCE_ID), std::move(group.GetExactlyOnceCheckpoint())); string res, errorMsg; APSARA_TEST_FALSE(serializer.DoSerialize(std::move(batch), res, errorMsg)); } } void SLSSerializerUnittest::TestSerializeEventGroupList() { vector<CompressedLogGroup> v; v.emplace_back("data1", 10); SLSEventGroupListSerializer serializer(sFlusher.get()); string res, errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(std::move(v), res, errorMsg)); sls_logs::SlsLogPackageList logPackageList; APSARA_TEST_TRUE(logPackageList.ParseFromString(res)); APSARA_TEST_EQUAL(1, logPackageList.packages_size()); APSARA_TEST_STREQ("data1", logPackageList.packages(0).data().c_str()); APSARA_TEST_EQUAL(10, logPackageList.packages(0).uncompress_size()); APSARA_TEST_EQUAL(sls_logs::SlsCompressType::SLS_CMP_NONE, logPackageList.packages(0).compress_type()); } BatchedEvents SLSSerializerUnittest::CreateBatchedLogEvents(bool enableNanosecond, bool withEmptyContent, bool withNonEmptyContent) { PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); if (withNonEmptyContent) { LogEvent* e = group.AddLogEvent(); e->SetContent(string("key"), string("value")); if (enableNanosecond) { e->SetTimestamp(1234567890, 1); } else { e->SetTimestamp(1234567890); } } if (withEmptyContent) { LogEvent* e = group.AddLogEvent(); if (enableNanosecond) { e->SetTimestamp(1234567890, 1); } else { e->SetTimestamp(1234567890); } } BatchedEvents batch(std::move(group.MutableEvents()), std::move(group.GetSizedTags()), std::move(group.GetSourceBuffer()), group.GetMetadata(EventGroupMetaKey::SOURCE_ID), std::move(group.GetExactlyOnceCheckpoint())); return batch; } BatchedEvents SLSSerializerUnittest::CreateBatchedMetricEvents(bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag) { PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); MetricEvent* e = group.AddMetricEvent(); e->SetTag(string("key1"), string("value1")); if (!onlyOneTag) { e->SetTag(string("key2"), string("value2")); } if (enableNanosecond) { e->SetTimestamp(1234567890, nanoTimestamp); } else { e->SetTimestamp(1234567890); } if (!emptyValue) { double value = 0.1; e->SetValue<UntypedSingleValue>(value); } e->SetName("test_gauge"); BatchedEvents batch(std::move(group.MutableEvents()), std::move(group.GetSizedTags()), std::move(group.GetSourceBuffer()), group.GetMetadata(EventGroupMetaKey::SOURCE_ID), std::move(group.GetExactlyOnceCheckpoint())); return batch; } BatchedEvents SLSSerializerUnittest::CreateBatchedRawEvents(bool enableNanosecond, bool withEmptyContent, bool withNonEmptyContent) { PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); if (withNonEmptyContent) { RawEvent* e = group.AddRawEvent(); e->SetContent(string("value")); if (enableNanosecond) { e->SetTimestamp(1234567890, 1); } else { e->SetTimestamp(1234567890); } } if (withEmptyContent) { RawEvent* e = group.AddRawEvent(); e->SetContent(string("")); if (enableNanosecond) { e->SetTimestamp(1234567890, 1); } else { e->SetTimestamp(1234567890); } } BatchedEvents batch(std::move(group.MutableEvents()), std::move(group.GetSizedTags()), std::move(group.GetSourceBuffer()), group.GetMetadata(EventGroupMetaKey::SOURCE_ID), std::move(group.GetExactlyOnceCheckpoint())); return batch; } BatchedEvents SLSSerializerUnittest::CreateBatchedSpanEvents() { PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "aaa"); group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "bbb"); auto now = std::chrono::system_clock::now(); auto duration = now.time_since_epoch(); auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration).count(); // auto nano = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count(); StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); SpanEvent* spanEvent = group.AddSpanEvent(); spanEvent->SetScopeTag(std::string("scope-tag-0"), std::string("scope-value-0")); spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/1")); spanEvent->SetTag(std::string("rpcType"), std::string("25")); spanEvent->SetTag(std::string("callType"), std::string("http-client")); spanEvent->SetTag(std::string("statusCode"), std::string("200")); spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); auto innerEvent = spanEvent->AddEvent(); innerEvent->SetTag(std::string("innner-event-key-0"), std::string("inner-event-value-0")); innerEvent->SetTag(std::string("innner-event-key-1"), std::string("inner-event-value-1")); innerEvent->SetName("inner-event"); innerEvent->SetTimestampNs(1000); auto innerLink = spanEvent->AddLink(); innerLink->SetTag(std::string("innner-link-key-0"), std::string("inner-link-value-0")); innerLink->SetTag(std::string("innner-link-key-1"), std::string("inner-link-value-1")); innerLink->SetTraceId("inner-link-traceid"); innerLink->SetSpanId("inner-link-spanid"); innerLink->SetTraceState("inner-link-trace-state"); spanEvent->SetName("/oneagent/qianlu/local/1"); spanEvent->SetKind(SpanEvent::Kind::Client); spanEvent->SetStatus(SpanEvent::StatusCode::Ok); spanEvent->SetSpanId("span-1-2-3-4-5"); spanEvent->SetTraceId("trace-1-2-3-4-5"); spanEvent->SetParentSpanId("parent-1-2-3-4-5"); spanEvent->SetTraceState("test-state"); spanEvent->SetStartTimeNs(1000); spanEvent->SetEndTimeNs(2000); spanEvent->SetTimestamp(seconds); BatchedEvents batch(std::move(group.MutableEvents()), std::move(group.GetSizedTags()), std::move(group.GetSourceBuffer()), group.GetMetadata(EventGroupMetaKey::SOURCE_ID), std::move(group.GetExactlyOnceCheckpoint())); return batch; } UNIT_TEST_CASE(SLSSerializerUnittest, TestSerializeEventGroup) UNIT_TEST_CASE(SLSSerializerUnittest, TestSerializeEventGroupList) } // namespace logtail UNIT_TEST_MAIN