in core/unittest/serializer/SLSSerializerUnittest.cpp [55:425]
void SLSSerializerUnittest::TestSerializeEventGroup() {
SLSEventGroupSerializer serializer(sFlusher.get());
{ // log
{ // nano second disabled, and set
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(false), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
APSARA_TEST_STREQ("key", logGroup.logs(0).contents(0).key().c_str());
APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
APSARA_TEST_STREQ("source", logGroup.source().c_str());
APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
}
{
// nano second enabled, and set
const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true;
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(true), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_EQUAL(1U, logGroup.logs(0).time_ns());
const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false;
}
{
// nano second enabled, not set
const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true;
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(false), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false;
}
{ // with empty event
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedLogEvents(false, true, true), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
APSARA_TEST_STREQ("key", logGroup.logs(0).contents(0).key().c_str());
APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
APSARA_TEST_STREQ("source", logGroup.source().c_str());
APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
}
{
// only empty event
string res, errorMsg;
APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedLogEvents(false, true, false), res, errorMsg));
}
} // namespace logtail
{ // metric
{ // only 1 tag
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, false, true), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4);
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge");
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
APSARA_TEST_STREQ("source", logGroup.source().c_str());
APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
}
{
// nano second disabled
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, false, false), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4);
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1|key2#$#value2");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge");
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
APSARA_TEST_STREQ("source", logGroup.source().c_str());
APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
}
{
// nano second enabled, less than 9 digits
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(true, 1, false, false), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4);
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1|key2#$#value2");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890000000001");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge");
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
APSARA_TEST_STREQ("source", logGroup.source().c_str());
APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
}
{
// nano second enabled, exactly 9 digits
string res, errorMsg;
APSARA_TEST_TRUE(
serializer.DoSerialize(CreateBatchedMetricEvents(true, 999999999, false, false), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
APSARA_TEST_EQUAL(logGroup.logs(0).contents_size(), 4);
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "__labels__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "key1#$#value1|key2#$#value2");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "__time_nano__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "1234567890999999999");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "__value__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "0.100000");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "__name__");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "test_gauge");
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
APSARA_TEST_STREQ("source", logGroup.source().c_str());
APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
}
{
// timestamp invalid
string res, errorMsg;
auto batch = CreateBatchedMetricEvents(false, 0, false, false);
batch.mEvents[0]->SetTimestamp(123);
APSARA_TEST_FALSE(serializer.DoSerialize(std::move(batch), res, errorMsg));
}
{
// empty metric value
string res, errorMsg;
APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, true, false), res, errorMsg));
}
}
{
// span
string res, errorMsg;
auto events = CreateBatchedSpanEvents();
APSARA_TEST_EQUAL(events.mEvents.size(), 1U);
APSARA_TEST_TRUE(events.mEvents[0]->GetType() == PipelineEvent::Type::SPAN);
APSARA_TEST_TRUE(serializer.DoSerialize(std::move(events), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(13, logGroup.logs(0).contents_size());
// traceid
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).key(), "traceId");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(0).value(), "trace-1-2-3-4-5");
// span id
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).key(), "spanId");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(1).value(), "span-1-2-3-4-5");
// parent span id
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).key(), "parentSpanId");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(2).value(), "parent-1-2-3-4-5");
// spanName
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).key(), "spanName");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(3).value(), "/oneagent/qianlu/local/1");
// kind
APSARA_TEST_EQUAL(logGroup.logs(0).contents(4).key(), "kind");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(4).value(), "client");
// code
APSARA_TEST_EQUAL(logGroup.logs(0).contents(5).key(), "statusCode");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(5).value(), "OK");
// traceState
APSARA_TEST_EQUAL(logGroup.logs(0).contents(6).key(), "traceState");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(6).value(), "test-state");
// attributes
APSARA_TEST_EQUAL(logGroup.logs(0).contents(7).key(), "attributes");
auto attrs = logGroup.logs(0).contents(7).value();
Json::Value jsonVal;
Json::CharReaderBuilder readerBuilder;
std::string errs;
std::istringstream s(attrs);
bool ret = Json::parseFromStream(readerBuilder, s, &jsonVal, &errs);
APSARA_TEST_TRUE(ret);
APSARA_TEST_EQUAL(jsonVal.size(), 10U);
APSARA_TEST_EQUAL(jsonVal["rpcType"].asString(), "25");
APSARA_TEST_EQUAL(jsonVal["scope-tag-0"].asString(), "scope-value-0");
// APSARA_TEST_EQUAL(logGroup.logs(0).contents(7).value(), "");
// links
APSARA_TEST_EQUAL(logGroup.logs(0).contents(8).key(), "links");
auto linksStr = logGroup.logs(0).contents(8).value();
std::istringstream ss(linksStr);
ret = Json::parseFromStream(readerBuilder, ss, &jsonVal, &errs);
APSARA_TEST_TRUE(ret);
APSARA_TEST_EQUAL(jsonVal.size(), 1U);
for (auto& link : jsonVal) {
APSARA_TEST_EQUAL(link["spanId"].asString(), "inner-link-spanid");
APSARA_TEST_EQUAL(link["traceId"].asString(), "inner-link-traceid");
APSARA_TEST_EQUAL(link["traceState"].asString(), "inner-link-trace-state");
}
// events
APSARA_TEST_EQUAL(logGroup.logs(0).contents(9).key(), "events");
auto eventsStr = logGroup.logs(0).contents(9).value();
std::istringstream sss(eventsStr);
ret = Json::parseFromStream(readerBuilder, sss, &jsonVal, &errs);
APSARA_TEST_TRUE(ret);
APSARA_TEST_EQUAL(jsonVal.size(), 1U);
for (auto& event : jsonVal) {
APSARA_TEST_EQUAL(event["name"].asString(), "inner-event");
APSARA_TEST_EQUAL(event["timestamp"].asString(), "1000");
}
// start
APSARA_TEST_EQUAL(logGroup.logs(0).contents(10).key(), "startTime");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(10).value(), "1000");
// end
APSARA_TEST_EQUAL(logGroup.logs(0).contents(11).key(), "endTime");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(11).value(), "2000");
// duration
APSARA_TEST_EQUAL(logGroup.logs(0).contents(12).key(), "duration");
APSARA_TEST_EQUAL(logGroup.logs(0).contents(12).value(), "1000");
}
{ // raw
{ // nano second disabled, and set
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str());
APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
APSARA_TEST_STREQ("source", logGroup.source().c_str());
APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
}
{
// nano second enabled, and set
const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true;
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(true), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_EQUAL(1U, logGroup.logs(0).time_ns());
const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false;
}
{
// nano second enabled, not set
const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true;
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
const_cast<GlobalConfig&>(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false;
}
{ // with empty event
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false, true, true), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(2, logGroup.logs_size());
APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str());
APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
APSARA_TEST_EQUAL(1, logGroup.logs(1).contents_size());
APSARA_TEST_STREQ("content", logGroup.logs(1).contents(0).key().c_str());
APSARA_TEST_STREQ("", logGroup.logs(1).contents(0).value().c_str());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(1).time());
APSARA_TEST_FALSE(logGroup.logs(1).has_time_ns());
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
APSARA_TEST_STREQ("source", logGroup.source().c_str());
APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
}
{
// only empty event
string res, errorMsg;
APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false, true, false), res, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(res));
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str());
APSARA_TEST_STREQ("", logGroup.logs(0).contents(0).value().c_str());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns());
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str());
APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str());
APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str());
APSARA_TEST_STREQ("source", logGroup.source().c_str());
APSARA_TEST_STREQ("topic", logGroup.topic().c_str());
}
}
{
// log group exceed size limit
INT32_FLAG(max_send_log_group_size) = 0;
string res, errorMsg;
APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedLogEvents(true, false), res, errorMsg));
INT32_FLAG(max_send_log_group_size) = 10 * 1024 * 1024;
}
{
// empty log group
PipelineEventGroup group(make_shared<SourceBuffer>());
BatchedEvents batch(std::move(group.MutableEvents()),
std::move(group.GetSizedTags()),
std::move(group.GetSourceBuffer()),
group.GetMetadata(EventGroupMetaKey::SOURCE_ID),
std::move(group.GetExactlyOnceCheckpoint()));
string res, errorMsg;
APSARA_TEST_FALSE(serializer.DoSerialize(std::move(batch), res, errorMsg));
}
}