in core/collection_pipeline/serializer/SLSSerializer.cpp [84:332]
bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) {
if (group.mEvents.empty()) {
errorMsg = "empty event group";
return false;
}
PipelineEvent::Type eventType = group.mEvents[0]->GetType();
if (eventType == PipelineEvent::Type::NONE) {
// should not happen
errorMsg = "unsupported event type in event group";
return false;
}
bool enableNs = mFlusher->GetContext().GetGlobalConfig().mEnableTimestampNanosecond;
// caculate serialized logGroup size first, where some critical results can be cached
vector<size_t> logSZ(group.mEvents.size());
vector<pair<string, size_t>> metricEventContentCache(group.mEvents.size());
vector<array<string, 6>> spanEventContentCache(group.mEvents.size());
size_t logGroupSZ = 0;
switch (eventType) {
case PipelineEvent::Type::LOG: {
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<LogEvent>();
if (e.Empty()) {
continue;
}
size_t contentSZ = 0;
for (const auto& kv : e) {
contentSZ += GetLogContentSize(kv.first.size(), kv.second.size());
}
logGroupSZ += GetLogSize(contentSZ, enableNs && e.GetTimestampNanosecond(), logSZ[i]);
}
break;
}
case PipelineEvent::Type::METRIC: {
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<MetricEvent>();
if (e.GetTimestamp() < 1e9) {
LOG_WARNING(sLogger,
("metric event timestamp is less than 1e9", "discard event")(
"timestamp", e.GetTimestamp())("config", mFlusher->GetContext().GetConfigName()));
continue;
}
if (e.Is<UntypedSingleValue>()) {
metricEventContentCache[i].first = to_string(e.GetValue<UntypedSingleValue>()->mValue);
} else {
// untyped multi value is not supported
LOG_WARNING(sLogger,
("invalid metric event type", "discard event")("config",
mFlusher->GetContext().GetConfigName()));
continue;
}
metricEventContentCache[i].second = GetMetricLabelSize(e);
size_t contentSZ = 0;
contentSZ += GetLogContentSize(METRIC_RESERVED_KEY_NAME.size(), e.GetName().size());
contentSZ
+= GetLogContentSize(METRIC_RESERVED_KEY_VALUE.size(), metricEventContentCache[i].first.size());
contentSZ
+= GetLogContentSize(METRIC_RESERVED_KEY_TIME_NANO.size(), e.GetTimestampNanosecond() ? 19U : 10U);
contentSZ += GetLogContentSize(METRIC_RESERVED_KEY_LABELS.size(), metricEventContentCache[i].second);
logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]);
}
break;
}
case PipelineEvent::Type::SPAN:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<SpanEvent>();
size_t contentSZ = 0;
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_TRACE_ID.size(), e.GetTraceId().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_ID.size(), e.GetSpanId().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_PARENT_ID.size(), e.GetParentSpanId().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_NAME.size(), e.GetName().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_KIND.size(), GetKindString(e.GetKind()).size());
contentSZ
+= GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), GetStatusString(e.GetStatus()).size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_TRACE_STATE.size(), e.GetTraceState().size());
// set tags and scope tags
Json::Value jsonVal;
for (auto it = e.TagsBegin(); it != e.TagsEnd(); ++it) {
jsonVal[it->first.to_string()] = it->second.to_string();
}
for (auto it = e.ScopeTagsBegin(); it != e.ScopeTagsEnd(); ++it) {
jsonVal[it->first.to_string()] = it->second.to_string();
}
Json::StreamWriterBuilder writer;
std::string attrString = Json::writeString(writer, jsonVal);
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_ATTRIBUTES.size(), attrString.size());
spanEventContentCache[i][0] = std::move(attrString);
auto linkString = SerializeSpanLinksToString(e);
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_LINKS.size(), linkString.size());
spanEventContentCache[i][1] = std::move(linkString);
auto eventString = SerializeSpanEventsToString(e);
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_EVENTS.size(), eventString.size());
spanEventContentCache[i][2] = std::move(eventString);
// time related
auto startTsNs = std::to_string(e.GetStartTimeNs());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_START_TIME_NANO.size(), startTsNs.size());
spanEventContentCache[i][3] = std::move(startTsNs);
auto endTsNs = std::to_string(e.GetEndTimeNs());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_END_TIME_NANO.size(), endTsNs.size());
spanEventContentCache[i][4] = std::move(endTsNs);
auto durationNs = std::to_string(e.GetEndTimeNs() - e.GetStartTimeNs());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_DURATION.size(), durationNs.size());
spanEventContentCache[i][5] = std::move(durationNs);
logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]);
}
break;
case PipelineEvent::Type::RAW:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<RawEvent>();
size_t contentSZ = GetLogContentSize(DEFAULT_CONTENT_KEY.size(), e.GetContent().size());
logGroupSZ += GetLogSize(contentSZ, enableNs && e.GetTimestampNanosecond(), logSZ[i]);
}
break;
default:
break;
}
if (logGroupSZ == 0) {
errorMsg = "all empty logs";
return false;
}
// loggroup.category is deprecated, no need to set
for (const auto& tag : group.mTags.mInner) {
if (tag.first == LOG_RESERVED_KEY_TOPIC || tag.first == LOG_RESERVED_KEY_SOURCE
|| tag.first == LOG_RESERVED_KEY_MACHINE_UUID) {
logGroupSZ += GetStringSize(tag.second.size());
} else {
logGroupSZ += GetLogTagSize(tag.first.size(), tag.second.size());
}
}
if (static_cast<int32_t>(logGroupSZ) > INT32_FLAG(max_send_log_group_size)) {
errorMsg = "log group exceeds size limit\tgroup size: " + ToString(logGroupSZ)
+ "\tsize limit: " + ToString(INT32_FLAG(max_send_log_group_size));
return false;
}
thread_local LogGroupSerializer serializer;
serializer.Prepare(logGroupSZ);
switch (eventType) {
case PipelineEvent::Type::LOG:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<LogEvent>();
if (e.Empty()) {
continue;
}
serializer.StartToAddLog(logSZ[i]);
serializer.AddLogTime(e.GetTimestamp());
for (const auto& kv : e) {
serializer.AddLogContent(kv.first, kv.second);
}
if (enableNs && e.GetTimestampNanosecond()) {
serializer.AddLogTimeNs(e.GetTimestampNanosecond().value());
}
}
break;
case PipelineEvent::Type::METRIC:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
auto& e = group.mEvents[i].Cast<MetricEvent>();
if (!e.Is<UntypedSingleValue>() || e.GetTimestamp() < 1e9) {
continue;
}
serializer.StartToAddLog(logSZ[i]);
serializer.AddLogTime(e.GetTimestamp());
e.SortTags();
serializer.AddLogContentMetricLabel(e, metricEventContentCache[i].second);
serializer.AddLogContentMetricTimeNano(e);
serializer.AddLogContent(METRIC_RESERVED_KEY_VALUE, metricEventContentCache[i].first);
serializer.AddLogContent(METRIC_RESERVED_KEY_NAME, e.GetName());
}
break;
case PipelineEvent::Type::SPAN:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& spanEvent = group.mEvents[i].Cast<SpanEvent>();
serializer.StartToAddLog(logSZ[i]);
serializer.AddLogTime(spanEvent.GetTimestamp());
// set trace_id span_id span_kind status etc
serializer.AddLogContent(DEFAULT_TRACE_TAG_TRACE_ID, spanEvent.GetTraceId());
serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_ID, spanEvent.GetSpanId());
serializer.AddLogContent(DEFAULT_TRACE_TAG_PARENT_ID, spanEvent.GetParentSpanId());
// span_name
serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_NAME, spanEvent.GetName());
// span_kind
serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_KIND, GetKindString(spanEvent.GetKind()));
// status_code
serializer.AddLogContent(DEFAULT_TRACE_TAG_STATUS_CODE, GetStatusString(spanEvent.GetStatus()));
// trace state
serializer.AddLogContent(DEFAULT_TRACE_TAG_TRACE_STATE, spanEvent.GetTraceState());
serializer.AddLogContent(DEFAULT_TRACE_TAG_ATTRIBUTES, spanEventContentCache[i][0]);
serializer.AddLogContent(DEFAULT_TRACE_TAG_LINKS, spanEventContentCache[i][1]);
serializer.AddLogContent(DEFAULT_TRACE_TAG_EVENTS, spanEventContentCache[i][2]);
// start_time
serializer.AddLogContent(DEFAULT_TRACE_TAG_START_TIME_NANO, spanEventContentCache[i][3]);
// end_time
serializer.AddLogContent(DEFAULT_TRACE_TAG_END_TIME_NANO, spanEventContentCache[i][4]);
// duration
serializer.AddLogContent(DEFAULT_TRACE_TAG_DURATION, spanEventContentCache[i][5]);
}
break;
case PipelineEvent::Type::RAW:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<RawEvent>();
serializer.StartToAddLog(logSZ[i]);
serializer.AddLogTime(e.GetTimestamp());
serializer.AddLogContent(DEFAULT_CONTENT_KEY, e.GetContent());
if (enableNs && e.GetTimestampNanosecond()) {
serializer.AddLogTimeNs(e.GetTimestampNanosecond().value());
}
}
break;
default:
break;
}
for (const auto& tag : group.mTags.mInner) {
if (tag.first == LOG_RESERVED_KEY_TOPIC) {
serializer.AddTopic(tag.second);
} else if (tag.first == LOG_RESERVED_KEY_SOURCE) {
serializer.AddSource(tag.second);
} else if (tag.first == LOG_RESERVED_KEY_MACHINE_UUID) {
serializer.AddMachineUUID(tag.second);
} else {
serializer.AddLogTag(tag.first, tag.second);
}
}
res = std::move(serializer.GetResult());
// when function stablize, remove the following logic
if (BOOL_FLAG(debug_sls_serializer)) {
sls_logs::LogGroup logGroup;
if (!logGroup.ParseFromString(res)) {
JsonEventGroupSerializer ser(const_cast<Flusher*>(mFlusher));
string jsonStr;
ser.DoSerialize(std::move(group), jsonStr, errorMsg);
LOG_ERROR(sLogger,
("failed to parse log group", jsonStr)("config", mFlusher->GetContext().GetConfigName()));
return false;
}
}
return true;
}