core/collection_pipeline/serializer/JsonSerializer.cpp (110 lines of code) (raw):

// Copyright 2024 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "collection_pipeline/serializer/JsonSerializer.h" #include "rapidjson/stringbuffer.h" #include "rapidjson/writer.h" using namespace std; namespace logtail { // Helper function to serialize common fields (tags and time) template <typename WriterType> void SerializeCommonFields(const SizedMap& tags, uint64_t timestamp, WriterType& writer) { // Serialize tags for (const auto& tag : tags.mInner) { writer.Key(tag.first.to_string().c_str()); writer.String(tag.second.to_string().c_str()); } // Serialize time writer.Key("__time__"); writer.Uint64(timestamp); } bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) { if (group.mEvents.empty()) { errorMsg = "empty event group"; return false; } PipelineEvent::Type eventType = group.mEvents[0]->GetType(); if (eventType == PipelineEvent::Type::NONE) { // should not happen errorMsg = "unsupported event type in event group"; return false; } else if (eventType == PipelineEvent::Type::SPAN) { errorMsg = "invalid event type, span type is not yet supported"; return false; } // Create reusable StringBuffer and Writer rapidjson::StringBuffer jsonBuffer; rapidjson::Writer<rapidjson::StringBuffer> writer(jsonBuffer); auto resetBuffer = [&jsonBuffer, &writer]() { jsonBuffer.Clear(); // Clear the buffer for reuse writer.Reset(jsonBuffer); }; // TODO: should support nano second switch (eventType) { case PipelineEvent::Type::LOG: for (const auto& item : group.mEvents) { const auto& e = item.Cast<LogEvent>(); if (e.Empty()) { continue; } resetBuffer(); writer.StartObject(); SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // contents for (const auto& kv : e) { writer.Key(kv.first.to_string().c_str()); writer.String(kv.second.to_string().c_str()); } writer.EndObject(); res.append(jsonBuffer.GetString()); res.append("\n"); } break; case PipelineEvent::Type::METRIC: // TODO: key should support custom key for (const auto& item : group.mEvents) { const auto& e = item.Cast<MetricEvent>(); if (e.Is<std::monostate>()) { continue; } resetBuffer(); writer.StartObject(); SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // __labels__ writer.Key("__labels__"); writer.StartObject(); for (auto tag = e.TagsBegin(); tag != e.TagsEnd(); tag++) { writer.Key(tag->first.to_string().c_str()); writer.String(tag->second.to_string().c_str()); } writer.EndObject(); // __name__ writer.Key("__name__"); writer.String(e.GetName().to_string().c_str()); // __value__ writer.Key("__value__"); if (e.Is<UntypedSingleValue>()) { writer.Double(e.GetValue<UntypedSingleValue>()->mValue); } else if (e.Is<UntypedMultiDoubleValues>()) { writer.StartObject(); for (auto value = e.GetValue<UntypedMultiDoubleValues>()->ValuesBegin(); value != e.GetValue<UntypedMultiDoubleValues>()->ValuesEnd(); value++) { writer.Key(value->first.to_string().c_str()); writer.Double(value->second.Value); } writer.EndObject(); } writer.EndObject(); res.append(jsonBuffer.GetString()); res.append("\n"); } break; case PipelineEvent::Type::RAW: for (const auto& item : group.mEvents) { const auto& e = item.Cast<RawEvent>(); if (e.GetContent().empty()) { continue; } resetBuffer(); writer.StartObject(); SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // content writer.Key(DEFAULT_CONTENT_KEY.c_str()); writer.String(e.GetContent().to_string().c_str()); writer.EndObject(); res.append(jsonBuffer.GetString()); res.append("\n"); } break; default: break; } return !res.empty(); } } // namespace logtail