bool SLSEventGroupSerializer::Serialize()

in core/collection_pipeline/serializer/SLSSerializer.cpp [84:332]


bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) {
    if (group.mEvents.empty()) {
        errorMsg = "empty event group";
        return false;
    }

    PipelineEvent::Type eventType = group.mEvents[0]->GetType();
    if (eventType == PipelineEvent::Type::NONE) {
        // should not happen
        errorMsg = "unsupported event type in event group";
        return false;
    }

    bool enableNs = mFlusher->GetContext().GetGlobalConfig().mEnableTimestampNanosecond;

    // caculate serialized logGroup size first, where some critical results can be cached
    vector<size_t> logSZ(group.mEvents.size());
    vector<pair<string, size_t>> metricEventContentCache(group.mEvents.size());
    vector<array<string, 6>> spanEventContentCache(group.mEvents.size());
    size_t logGroupSZ = 0;
    switch (eventType) {
        case PipelineEvent::Type::LOG: {
            for (size_t i = 0; i < group.mEvents.size(); ++i) {
                const auto& e = group.mEvents[i].Cast<LogEvent>();
                if (e.Empty()) {
                    continue;
                }
                size_t contentSZ = 0;
                for (const auto& kv : e) {
                    contentSZ += GetLogContentSize(kv.first.size(), kv.second.size());
                }
                logGroupSZ += GetLogSize(contentSZ, enableNs && e.GetTimestampNanosecond(), logSZ[i]);
            }
            break;
        }
        case PipelineEvent::Type::METRIC: {
            for (size_t i = 0; i < group.mEvents.size(); ++i) {
                const auto& e = group.mEvents[i].Cast<MetricEvent>();
                if (e.GetTimestamp() < 1e9) {
                    LOG_WARNING(sLogger,
                                ("metric event timestamp is less than 1e9", "discard event")(
                                    "timestamp", e.GetTimestamp())("config", mFlusher->GetContext().GetConfigName()));
                    continue;
                }
                if (e.Is<UntypedSingleValue>()) {
                    metricEventContentCache[i].first = to_string(e.GetValue<UntypedSingleValue>()->mValue);
                } else {
                    // untyped multi value is not supported
                    LOG_WARNING(sLogger,
                                ("invalid metric event type", "discard event")("config",
                                                                               mFlusher->GetContext().GetConfigName()));
                    continue;
                }
                metricEventContentCache[i].second = GetMetricLabelSize(e);

                size_t contentSZ = 0;
                contentSZ += GetLogContentSize(METRIC_RESERVED_KEY_NAME.size(), e.GetName().size());
                contentSZ
                    += GetLogContentSize(METRIC_RESERVED_KEY_VALUE.size(), metricEventContentCache[i].first.size());
                contentSZ
                    += GetLogContentSize(METRIC_RESERVED_KEY_TIME_NANO.size(), e.GetTimestampNanosecond() ? 19U : 10U);
                contentSZ += GetLogContentSize(METRIC_RESERVED_KEY_LABELS.size(), metricEventContentCache[i].second);
                logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]);
            }
            break;
        }
        case PipelineEvent::Type::SPAN:
            for (size_t i = 0; i < group.mEvents.size(); ++i) {
                const auto& e = group.mEvents[i].Cast<SpanEvent>();
                size_t contentSZ = 0;
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_TRACE_ID.size(), e.GetTraceId().size());
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_ID.size(), e.GetSpanId().size());
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_PARENT_ID.size(), e.GetParentSpanId().size());
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_NAME.size(), e.GetName().size());
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_KIND.size(), GetKindString(e.GetKind()).size());
                contentSZ
                    += GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), GetStatusString(e.GetStatus()).size());
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_TRACE_STATE.size(), e.GetTraceState().size());

                // set tags and scope tags
                Json::Value jsonVal;
                for (auto it = e.TagsBegin(); it != e.TagsEnd(); ++it) {
                    jsonVal[it->first.to_string()] = it->second.to_string();
                }
                for (auto it = e.ScopeTagsBegin(); it != e.ScopeTagsEnd(); ++it) {
                    jsonVal[it->first.to_string()] = it->second.to_string();
                }
                Json::StreamWriterBuilder writer;
                std::string attrString = Json::writeString(writer, jsonVal);
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_ATTRIBUTES.size(), attrString.size());
                spanEventContentCache[i][0] = std::move(attrString);
                auto linkString = SerializeSpanLinksToString(e);
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_LINKS.size(), linkString.size());
                spanEventContentCache[i][1] = std::move(linkString);
                auto eventString = SerializeSpanEventsToString(e);
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_EVENTS.size(), eventString.size());
                spanEventContentCache[i][2] = std::move(eventString);

                // time related
                auto startTsNs = std::to_string(e.GetStartTimeNs());
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_START_TIME_NANO.size(), startTsNs.size());
                spanEventContentCache[i][3] = std::move(startTsNs);
                auto endTsNs = std::to_string(e.GetEndTimeNs());
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_END_TIME_NANO.size(), endTsNs.size());
                spanEventContentCache[i][4] = std::move(endTsNs);
                auto durationNs = std::to_string(e.GetEndTimeNs() - e.GetStartTimeNs());
                contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_DURATION.size(), durationNs.size());
                spanEventContentCache[i][5] = std::move(durationNs);
                logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]);
            }
            break;
        case PipelineEvent::Type::RAW:
            for (size_t i = 0; i < group.mEvents.size(); ++i) {
                const auto& e = group.mEvents[i].Cast<RawEvent>();
                size_t contentSZ = GetLogContentSize(DEFAULT_CONTENT_KEY.size(), e.GetContent().size());
                logGroupSZ += GetLogSize(contentSZ, enableNs && e.GetTimestampNanosecond(), logSZ[i]);
            }
            break;
        default:
            break;
    }
    if (logGroupSZ == 0) {
        errorMsg = "all empty logs";
        return false;
    }

    // loggroup.category is deprecated, no need to set
    for (const auto& tag : group.mTags.mInner) {
        if (tag.first == LOG_RESERVED_KEY_TOPIC || tag.first == LOG_RESERVED_KEY_SOURCE
            || tag.first == LOG_RESERVED_KEY_MACHINE_UUID) {
            logGroupSZ += GetStringSize(tag.second.size());
        } else {
            logGroupSZ += GetLogTagSize(tag.first.size(), tag.second.size());
        }
    }

    if (static_cast<int32_t>(logGroupSZ) > INT32_FLAG(max_send_log_group_size)) {
        errorMsg = "log group exceeds size limit\tgroup size: " + ToString(logGroupSZ)
            + "\tsize limit: " + ToString(INT32_FLAG(max_send_log_group_size));
        return false;
    }

    thread_local LogGroupSerializer serializer;
    serializer.Prepare(logGroupSZ);
    switch (eventType) {
        case PipelineEvent::Type::LOG:
            for (size_t i = 0; i < group.mEvents.size(); ++i) {
                const auto& e = group.mEvents[i].Cast<LogEvent>();
                if (e.Empty()) {
                    continue;
                }
                serializer.StartToAddLog(logSZ[i]);
                serializer.AddLogTime(e.GetTimestamp());
                for (const auto& kv : e) {
                    serializer.AddLogContent(kv.first, kv.second);
                }
                if (enableNs && e.GetTimestampNanosecond()) {
                    serializer.AddLogTimeNs(e.GetTimestampNanosecond().value());
                }
            }
            break;
        case PipelineEvent::Type::METRIC:
            for (size_t i = 0; i < group.mEvents.size(); ++i) {
                auto& e = group.mEvents[i].Cast<MetricEvent>();
                if (!e.Is<UntypedSingleValue>() || e.GetTimestamp() < 1e9) {
                    continue;
                }
                serializer.StartToAddLog(logSZ[i]);
                serializer.AddLogTime(e.GetTimestamp());
                e.SortTags();
                serializer.AddLogContentMetricLabel(e, metricEventContentCache[i].second);
                serializer.AddLogContentMetricTimeNano(e);
                serializer.AddLogContent(METRIC_RESERVED_KEY_VALUE, metricEventContentCache[i].first);
                serializer.AddLogContent(METRIC_RESERVED_KEY_NAME, e.GetName());
            }
            break;
        case PipelineEvent::Type::SPAN:
            for (size_t i = 0; i < group.mEvents.size(); ++i) {
                const auto& spanEvent = group.mEvents[i].Cast<SpanEvent>();

                serializer.StartToAddLog(logSZ[i]);
                serializer.AddLogTime(spanEvent.GetTimestamp());
                // set trace_id span_id span_kind status etc
                serializer.AddLogContent(DEFAULT_TRACE_TAG_TRACE_ID, spanEvent.GetTraceId());
                serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_ID, spanEvent.GetSpanId());
                serializer.AddLogContent(DEFAULT_TRACE_TAG_PARENT_ID, spanEvent.GetParentSpanId());
                // span_name
                serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_NAME, spanEvent.GetName());
                // span_kind
                serializer.AddLogContent(DEFAULT_TRACE_TAG_SPAN_KIND, GetKindString(spanEvent.GetKind()));
                // status_code
                serializer.AddLogContent(DEFAULT_TRACE_TAG_STATUS_CODE, GetStatusString(spanEvent.GetStatus()));
                // trace state
                serializer.AddLogContent(DEFAULT_TRACE_TAG_TRACE_STATE, spanEvent.GetTraceState());

                serializer.AddLogContent(DEFAULT_TRACE_TAG_ATTRIBUTES, spanEventContentCache[i][0]);

                serializer.AddLogContent(DEFAULT_TRACE_TAG_LINKS, spanEventContentCache[i][1]);
                serializer.AddLogContent(DEFAULT_TRACE_TAG_EVENTS, spanEventContentCache[i][2]);

                // start_time
                serializer.AddLogContent(DEFAULT_TRACE_TAG_START_TIME_NANO, spanEventContentCache[i][3]);
                // end_time
                serializer.AddLogContent(DEFAULT_TRACE_TAG_END_TIME_NANO, spanEventContentCache[i][4]);
                // duration
                serializer.AddLogContent(DEFAULT_TRACE_TAG_DURATION, spanEventContentCache[i][5]);
            }
            break;
        case PipelineEvent::Type::RAW:
            for (size_t i = 0; i < group.mEvents.size(); ++i) {
                const auto& e = group.mEvents[i].Cast<RawEvent>();
                serializer.StartToAddLog(logSZ[i]);
                serializer.AddLogTime(e.GetTimestamp());
                serializer.AddLogContent(DEFAULT_CONTENT_KEY, e.GetContent());
                if (enableNs && e.GetTimestampNanosecond()) {
                    serializer.AddLogTimeNs(e.GetTimestampNanosecond().value());
                }
            }
            break;
        default:
            break;
    }
    for (const auto& tag : group.mTags.mInner) {
        if (tag.first == LOG_RESERVED_KEY_TOPIC) {
            serializer.AddTopic(tag.second);
        } else if (tag.first == LOG_RESERVED_KEY_SOURCE) {
            serializer.AddSource(tag.second);
        } else if (tag.first == LOG_RESERVED_KEY_MACHINE_UUID) {
            serializer.AddMachineUUID(tag.second);
        } else {
            serializer.AddLogTag(tag.first, tag.second);
        }
    }
    res = std::move(serializer.GetResult());

    // when function stablize, remove the following logic
    if (BOOL_FLAG(debug_sls_serializer)) {
        sls_logs::LogGroup logGroup;
        if (!logGroup.ParseFromString(res)) {
            JsonEventGroupSerializer ser(const_cast<Flusher*>(mFlusher));
            string jsonStr;
            ser.DoSerialize(std::move(group), jsonStr, errorMsg);
            LOG_ERROR(sLogger,
                      ("failed to parse log group", jsonStr)("config", mFlusher->GetContext().GetConfigName()));
            return false;
        }
    }
    return true;
}