void SLSSerializerUnittest::TestSerializeEventGroup()

in core/unittest/serializer/SLSSerializerUnittest.cpp [55:425]


void SLSSerializerUnittest::TestSerializeEventGroup() {
    SLSEventGroupSerializer serializer(sFlusher.get());
    { // log
        { // nano second disabled, and set
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(false), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));
            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
            APSARA_TEST_STREQ("key", logGroup.logs(0).contents(0).key().c_str());
            APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
            APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
            APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
            APSARA_TEST_STREQ("source", logGroup.source().c_str());
            APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
        }
        {
            // nano second enabled, and set
            const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true;
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(true), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_EQUAL(1U, logGroup.logs(0).time_ns());
            const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false;
        }
        {
            // nano second enabled, not set
            const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true;
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(false), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false;
        }
        { // with empty event
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(false, true, true), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));
            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
            APSARA_TEST_STREQ("key", logGroup.logs(0).contents(0).key().c_str());
            APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
            APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
            APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
            APSARA_TEST_STREQ("source", logGroup.source().c_str());
            APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
        }
        {
            // only empty event
            string res, errorMsg;
            APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedLogEvents(false, true, false), res, errorMsg));
        }
    } // namespace logtail
    { // metric
        { // only 1 tag
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, false, true), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));

            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4);
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge");
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
            APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
            APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
            APSARA_TEST_STREQ("source", logGroup.source().c_str());
            APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
        }
        {
            // nano second disabled
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, false, false), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));

            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4);
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1|key2#$#value2");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge");
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
            APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
            APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
            APSARA_TEST_STREQ("source", logGroup.source().c_str());
            APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
        }
        {
            // nano second enabled, less than 9 digits
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(true, 1, false, false), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));

            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4);
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1|key2#$#value2");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890000000001");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge");
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
            APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
            APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
            APSARA_TEST_STREQ("source", logGroup.source().c_str());
            APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
        }
        {
            // nano second enabled, exactly 9 digits
            string res, errorMsg;
            APSARA_TEST_TRUE(
                serializer.DoSerialize(CreateBatchedMetricEvents(true, 999999999, false, false), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));

            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4);
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1|key2#$#value2");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890999999999");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__");
            APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge");
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
            APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
            APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
            APSARA_TEST_STREQ("source", logGroup.source().c_str());
            APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
        }
        {
            // timestamp invalid
            string res, errorMsg;
            auto batch = CreateBatchedMetricEvents(false, 0, false, false);
            batch.mEvents[0]->SetTimestamp(123);
            APSARA_TEST_FALSE(serializer.DoSerialize(std::move(batch), res, errorMsg));
        }
        {
            // empty metric value
            string res, errorMsg;
            APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, true, false), res, errorMsg));
        }
    }
    {
        // span
        string res, errorMsg;
        auto events = CreateBatchedSpanEvents();
        APSARA_TEST_EQUAL(events.mEvents.size(), 1U);
        APSARA_TEST_TRUE(events.mEvents[0]->GetType() == PipelineEvent::Type::SPAN);
        APSARA_TEST_TRUE(serializer.DoSerialize(std::move(events), res, errorMsg));
        sls_logs::LogGroup logGroup;
        APSARA_TEST_TRUE(logGroup.ParseFromString(res));
        APSARA_TEST_EQUAL(1, logGroup.logs_size());
        APSARA_TEST_EQUAL(13, logGroup.logs(0).contents_size());
        // traceid
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "traceId");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "trace-1-2-3-4-5");
        // span id
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "spanId");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "span-1-2-3-4-5");
        // parent span id
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "parentSpanId");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "parent-1-2-3-4-5");
        // spanName
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "spanName");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "/oneagent/qianlu/local/1");
        // kind
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(4).key(), "kind");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(4).value(), "client");
        // code
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(5).key(), "statusCode");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(5).value(), "OK");
        // traceState
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(6).key(), "traceState");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(6).value(), "test-state");
        // attributes
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(7).key(), "attributes");
        auto attrs = logGroup.logs(0).contents(7).value();
        Json::Value jsonVal;
        Json::CharReaderBuilder readerBuilder;
        std::string errs;

        std::istringstream s(attrs);
        bool ret = Json::parseFromStream(readerBuilder, s, &jsonVal, &errs);
        APSARA_TEST_TRUE(ret);
        APSARA_TEST_EQUAL(jsonVal.size(), 10U);
        APSARA_TEST_EQUAL(jsonVal["rpcType"].asString(), "25");
        APSARA_TEST_EQUAL(jsonVal["scope-tag-0"].asString(), "scope-value-0");
        // APSARA_TEST_EQUAL(logGroup.logs(0).contents(7).value(), "");
        // links
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(8).key(), "links");

        auto linksStr = logGroup.logs(0).contents(8).value();

        std::istringstream ss(linksStr);
        ret = Json::parseFromStream(readerBuilder, ss, &jsonVal, &errs);
        APSARA_TEST_TRUE(ret);
        APSARA_TEST_EQUAL(jsonVal.size(), 1U);
        for (auto& link : jsonVal) {
            APSARA_TEST_EQUAL(link["spanId"].asString(), "inner-link-spanid");
            APSARA_TEST_EQUAL(link["traceId"].asString(), "inner-link-traceid");
            APSARA_TEST_EQUAL(link["traceState"].asString(), "inner-link-trace-state");
        }
        // events
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(9).key(), "events");
        auto eventsStr = logGroup.logs(0).contents(9).value();
        std::istringstream sss(eventsStr);
        ret = Json::parseFromStream(readerBuilder, sss, &jsonVal, &errs);
        APSARA_TEST_TRUE(ret);
        APSARA_TEST_EQUAL(jsonVal.size(), 1U);
        for (auto& event : jsonVal) {
            APSARA_TEST_EQUAL(event["name"].asString(), "inner-event");
            APSARA_TEST_EQUAL(event["timestamp"].asString(), "1000");
        }
        // start
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(10).key(), "startTime");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(10).value(), "1000");

        // end
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(11).key(), "endTime");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(11).value(), "2000");

        // duration
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(12).key(), "duration");
        APSARA_TEST_EQUAL(logGroup.logs(0).contents(12).value(), "1000");
    }
    { // raw
        { // nano second disabled, and set
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));
            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
            APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str());
            APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
            APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
            APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
            APSARA_TEST_STREQ("source", logGroup.source().c_str());
            APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
        }
        {
            // nano second enabled, and set
            const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true;
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(true), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_EQUAL(1U, logGroup.logs(0).time_ns());
            const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false;
        }
        {
            // nano second enabled, not set
            const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true;
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false;
        }
        { // with empty event
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false, true, true), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));
            APSARA_TEST_EQUAL(2, logGroup.logs_size());
            APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
            APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str());
            APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            APSARA_TEST_EQUAL(1, logGroup.logs(1).contents_size());
            APSARA_TEST_STREQ("content", logGroup.logs(1).contents(0).key().c_str());
            APSARA_TEST_STREQ("", logGroup.logs(1).contents(0).value().c_str());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(1).time());
            APSARA_TEST_FALSE(logGroup.logs(1).has_time_ns());
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
            APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
            APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
            APSARA_TEST_STREQ("source", logGroup.source().c_str());
            APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
        }
        {
            // only empty event
            string res, errorMsg;
            APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false, true, false), res, errorMsg));
            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(res));
            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
            APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str());
            APSARA_TEST_STREQ("", logGroup.logs(0).contents(0).value().c_str());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
            APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
            APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
            APSARA_TEST_STREQ("source", logGroup.source().c_str());
            APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
        }
    }
    {
        // log group exceed size limit
        INT32_FLAG(max_send_log_group_size) = 0;
        string res, errorMsg;
        APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedLogEvents(true, false), res, errorMsg));
        INT32_FLAG(max_send_log_group_size) = 10 * 1024 * 1024;
    }
    {
        // empty log group
        PipelineEventGroup group(make_shared<SourceBuffer>());
        BatchedEvents batch(std::move(group.MutableEvents()),
                            std::move(group.GetSizedTags()),
                            std::move(group.GetSourceBuffer()),
                            group.GetMetadata(EventGroupMetaKey::SOURCE_ID),
                            std::move(group.GetExactlyOnceCheckpoint()));
        string res, errorMsg;
        APSARA_TEST_FALSE(serializer.DoSerialize(std::move(batch), res, errorMsg));
    }
}