core/models/PipelineEventGroup.cpp (407 lines of code) (raw):

/* * Copyright 2023 iLogtail Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "models/PipelineEventGroup.h" #ifdef APSARA_UNIT_TEST_MAIN #include <sstream> #endif #include "common/HashUtil.h" #include "logger/Logger.h" #include "models/EventPool.h" #ifdef APSARA_UNIT_TEST_MAIN #include "plugin/processor/inner/ProcessorParseContainerLogNative.h" #endif using namespace std; namespace logtail { template <class T> void DestroyEvents(vector<PipelineEventPtr>&& events) { unordered_map<EventPool*, vector<T*>> eventsPoolMap; // for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency EventPool* cachedPoolPtr = nullptr; typename unordered_map<EventPool*, vector<T*>>::iterator cachedIt; bool firstEvent = true; for (auto& item : events) { if (item && item.IsFromEventPool()) { item->Reset(); if (firstEvent || item.GetEventPool() != cachedPoolPtr) { cachedPoolPtr = item.GetEventPool(); cachedIt = eventsPoolMap.find(cachedPoolPtr); if (cachedIt == eventsPoolMap.end()) { eventsPoolMap.emplace(cachedPoolPtr, vector<T*>()); cachedIt = eventsPoolMap.find(cachedPoolPtr); cachedIt->second.reserve(events.size()); } firstEvent = false; } cachedIt->second.emplace_back(static_cast<T*>(item.Release())); } } for (auto& item : eventsPoolMap) { if (item.first) { item.first->Release(std::move(item.second)); } else { gThreadedEventPool.Release(std::move(item.second)); } } } PipelineEventGroup::PipelineEventGroup(PipelineEventGroup&& rhs) noexcept : mMetadata(std::move(rhs.mMetadata)), mTags(std::move(rhs.mTags)), mEvents(std::move(rhs.mEvents)), mSourceBuffer(std::move(rhs.mSourceBuffer)) { for (auto& item : mEvents) { item->ResetPipelineEventGroup(this); } } PipelineEventGroup::~PipelineEventGroup() { if (mEvents.empty() || !mEvents[0]) { return; } switch (mEvents[0]->GetType()) { case PipelineEvent::Type::LOG: DestroyEvents<LogEvent>(std::move(mEvents)); break; case PipelineEvent::Type::METRIC: DestroyEvents<MetricEvent>(std::move(mEvents)); break; case PipelineEvent::Type::SPAN: DestroyEvents<SpanEvent>(std::move(mEvents)); break; case PipelineEvent::Type::RAW: DestroyEvents<RawEvent>(std::move(mEvents)); break; default: break; } } PipelineEventGroup& PipelineEventGroup::operator=(PipelineEventGroup&& rhs) noexcept { if (this != &rhs) { mMetadata = std::move(rhs.mMetadata); mTags = std::move(rhs.mTags); mEvents = std::move(rhs.mEvents); mSourceBuffer = std::move(rhs.mSourceBuffer); for (auto& item : mEvents) { item->ResetPipelineEventGroup(this); } } return *this; } PipelineEventGroup PipelineEventGroup::Copy() const { PipelineEventGroup res(mSourceBuffer); res.mMetadata = mMetadata; res.mTags = mTags; res.mExactlyOnceCheckpoint = mExactlyOnceCheckpoint; for (auto& event : mEvents) { res.mEvents.emplace_back(event.Copy()); res.mEvents.back()->ResetPipelineEventGroup(&res); } return res; } unique_ptr<LogEvent> PipelineEventGroup::CreateLogEvent(bool fromPool, EventPool* pool) { LogEvent* e = nullptr; if (fromPool) { if (pool) { e = pool->AcquireLogEvent(this); } else { e = gThreadedEventPool.AcquireLogEvent(this); } } else { e = new LogEvent(this); } return unique_ptr<LogEvent>(e); } unique_ptr<MetricEvent> PipelineEventGroup::CreateMetricEvent(bool fromPool, EventPool* pool) { MetricEvent* e = nullptr; if (fromPool) { if (pool) { e = pool->AcquireMetricEvent(this); } else { e = gThreadedEventPool.AcquireMetricEvent(this); } } else { e = new MetricEvent(this); } return unique_ptr<MetricEvent>(e); } unique_ptr<SpanEvent> PipelineEventGroup::CreateSpanEvent(bool fromPool, EventPool* pool) { SpanEvent* e = nullptr; if (fromPool) { if (pool) { e = pool->AcquireSpanEvent(this); } else { e = gThreadedEventPool.AcquireSpanEvent(this); } } else { e = new SpanEvent(this); } return unique_ptr<SpanEvent>(e); } unique_ptr<RawEvent> PipelineEventGroup::CreateRawEvent(bool fromPool, EventPool* pool) { RawEvent* e = nullptr; if (fromPool) { if (pool) { e = pool->AcquireRawEvent(this); } else { e = gThreadedEventPool.AcquireRawEvent(this); } } else { e = new RawEvent(this); } return unique_ptr<RawEvent>(e); } LogEvent* PipelineEventGroup::AddLogEvent(bool fromPool, EventPool* pool) { LogEvent* e = nullptr; if (fromPool) { if (pool) { e = pool->AcquireLogEvent(this); } else { e = gThreadedEventPool.AcquireLogEvent(this); } } else { e = new LogEvent(this); } mEvents.emplace_back(e, fromPool, pool); return e; } MetricEvent* PipelineEventGroup::AddMetricEvent(bool fromPool, EventPool* pool) { MetricEvent* e = nullptr; if (fromPool) { if (pool) { e = pool->AcquireMetricEvent(this); } else { e = gThreadedEventPool.AcquireMetricEvent(this); } } else { e = new MetricEvent(this); } mEvents.emplace_back(e, fromPool, pool); return e; } SpanEvent* PipelineEventGroup::AddSpanEvent(bool fromPool, EventPool* pool) { SpanEvent* e = nullptr; if (fromPool) { if (pool) { e = pool->AcquireSpanEvent(this); } else { e = gThreadedEventPool.AcquireSpanEvent(this); } } else { e = new SpanEvent(this); } mEvents.emplace_back(e, fromPool, pool); return e; } RawEvent* PipelineEventGroup::AddRawEvent(bool fromPool, EventPool* pool) { RawEvent* e = nullptr; if (fromPool) { if (pool) { e = pool->AcquireRawEvent(this); } else { e = gThreadedEventPool.AcquireRawEvent(this); } } else { e = new RawEvent(this); } mEvents.emplace_back(e, fromPool, pool); return e; } void PipelineEventGroup::SetMetadata(EventGroupMetaKey key, StringView val) { SetMetadataNoCopy(key, mSourceBuffer->CopyString(val)); } void PipelineEventGroup::SetMetadata(EventGroupMetaKey key, const string& val) { SetMetadataNoCopy(key, mSourceBuffer->CopyString(val)); } void PipelineEventGroup::SetMetadataNoCopy(EventGroupMetaKey key, const StringBuffer& val) { SetMetadataNoCopy(key, StringView(val.data, val.size)); } bool PipelineEventGroup::HasMetadata(EventGroupMetaKey key) const { return mMetadata.find(key) != mMetadata.end(); } void PipelineEventGroup::SetMetadataNoCopy(EventGroupMetaKey key, StringView val) { mMetadata[key] = val; } StringView PipelineEventGroup::GetMetadata(EventGroupMetaKey key) const { auto it = mMetadata.find(key); if (it != mMetadata.end()) { return it->second; } return gEmptyStringView; } void PipelineEventGroup::DelMetadata(EventGroupMetaKey key) { mMetadata.erase(key); } void PipelineEventGroup::SetTag(StringView key, StringView val) { SetTagNoCopy(mSourceBuffer->CopyString(key), mSourceBuffer->CopyString(val)); } void PipelineEventGroup::SetTag(const string& key, const string& val) { SetTagNoCopy(mSourceBuffer->CopyString(key), mSourceBuffer->CopyString(val)); } void PipelineEventGroup::SetTag(const StringBuffer& key, StringView val) { SetTagNoCopy(key, mSourceBuffer->CopyString(val)); } void PipelineEventGroup::SetTagNoCopy(const StringBuffer& key, const StringBuffer& val) { SetTagNoCopy(StringView(key.data, key.size), StringView(val.data, val.size)); } bool PipelineEventGroup::HasTag(StringView key) const { return mTags.mInner.find(key) != mTags.mInner.end(); } void PipelineEventGroup::SetTagNoCopy(StringView key, StringView val) { mTags.Insert(key, val); } StringView PipelineEventGroup::GetTag(StringView key) const { auto it = mTags.mInner.find(key); if (it != mTags.mInner.end()) { return it->second; } return gEmptyStringView; } void PipelineEventGroup::DelTag(StringView key) { mTags.Erase(key); } size_t PipelineEventGroup::GetTagsHash() const { size_t seed = 0; for (const auto& item : mTags.mInner) { HashCombine(seed, hash<string>{}(item.first.to_string())); HashCombine(seed, hash<string>{}(item.second.to_string())); } HashCombine(seed, hash<string>{}(GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string())); return seed; } size_t PipelineEventGroup::DataSize() const { size_t eventsSize = sizeof(decltype(mEvents)); for (const auto& item : mEvents) { eventsSize += item->DataSize(); } return eventsSize + mTags.DataSize(); } bool PipelineEventGroup::IsReplay() const { return mExactlyOnceCheckpoint != nullptr && mExactlyOnceCheckpoint->IsComplete(); } #ifdef APSARA_UNIT_TEST_MAIN const string EVENT_GROUP_META_LOG_FILE_PATH_RESOLVED = "log.file.path_resolved"; const string EVENT_GROUP_META_LOG_FILE_INODE = "log.file.inode"; const string EVENT_GROUP_META_CONTAINER_TYPE = "container.type"; const string EVENT_GROUP_META_HAS_PART_LOG = "has.part.log"; const string EVENT_GROUP_META_LOG_FILE_OFFSET = "log.file.offset"; const string EVENT_GROUP_META_K8S_CLUSTER_ID = "k8s.cluster.id"; const string EVENT_GROUP_META_K8S_NODE_NAME = "k8s.node.name"; const string EVENT_GROUP_META_K8S_NODE_IP = "k8s.node.ip"; const string EVENT_GROUP_META_K8S_NAMESPACE = "k8s.namespace.name"; const string EVENT_GROUP_META_K8S_POD_UID = "k8s.pod.uid"; const string EVENT_GROUP_META_K8S_POD_NAME = "k8s.pod.name"; const string EVENT_GROUP_META_CONTAINER_NAME = "container.name"; const string EVENT_GROUP_META_CONTAINER_IP = "container.ip"; const string EVENT_GROUP_META_CONTAINER_IMAGE_NAME = "container.image.name"; const string EVENT_GROUP_META_CONTAINER_IMAGE_ID = "container.image.id"; const string EVENT_GROUP_META_CONTAINERD_TEXT = "containerd_text"; const string EVENT_GROUP_META_DOCKER_JSON_FILE = "docker_json-file"; const string EVENT_GROUP_META_SOURCE_ID = "source.id"; const string& EventGroupMetaKeyToString(EventGroupMetaKey key) { switch (key) { case EventGroupMetaKey::LOG_FILE_PATH_RESOLVED: return EVENT_GROUP_META_LOG_FILE_PATH_RESOLVED; case EventGroupMetaKey::SOURCE_ID: return EVENT_GROUP_META_SOURCE_ID; case EventGroupMetaKey::LOG_FORMAT: return EVENT_GROUP_META_CONTAINER_TYPE; case EventGroupMetaKey::HAS_PART_LOG: return EVENT_GROUP_META_HAS_PART_LOG; case EventGroupMetaKey::LOG_FILE_OFFSET_KEY: return EVENT_GROUP_META_LOG_FILE_OFFSET; default: static string sEmpty = "unknown"; return sEmpty; } } const string EventGroupMetaValueToString(string value) { if (value == ProcessorParseContainerLogNative::CONTAINERD_TEXT) { return EVENT_GROUP_META_CONTAINERD_TEXT; } else if (value == ProcessorParseContainerLogNative::DOCKER_JSON_FILE) { return EVENT_GROUP_META_DOCKER_JSON_FILE; } return value; } EventGroupMetaKey StringToEventGroupMetaKey(const string& key) { static unordered_map<string, EventGroupMetaKey> sStringToEnum{ {EVENT_GROUP_META_LOG_FILE_PATH_RESOLVED, EventGroupMetaKey::LOG_FILE_PATH_RESOLVED}, {EVENT_GROUP_META_SOURCE_ID, EventGroupMetaKey::SOURCE_ID}, {EVENT_GROUP_META_HAS_PART_LOG, EventGroupMetaKey::HAS_PART_LOG}}; auto it = sStringToEnum.find(key); if (it != sStringToEnum.end()) { return it->second; } return EventGroupMetaKey::UNKNOWN; } Json::Value PipelineEventGroup::ToJson(bool enableEventMeta) const { Json::Value root; if (!mMetadata.empty()) { Json::Value metadata; for (const auto& meta : mMetadata) { metadata[EventGroupMetaKeyToString(meta.first)] = EventGroupMetaValueToString(meta.second.to_string()); } root["metadata"] = metadata; } if (!mTags.mInner.empty()) { Json::Value tags; for (const auto& tag : mTags.mInner) { tags[tag.first.to_string()] = tag.second.to_string(); } root["tags"] = tags; } if (!this->GetEvents().empty()) { Json::Value events; for (const auto& event : this->GetEvents()) { events.append(event->ToJson(enableEventMeta)); } root["events"] = std::move(events); } return root; } bool PipelineEventGroup::FromJson(const Json::Value& root) { if (root.isMember("metadata")) { Json::Value metadata = root["metadata"]; for (const auto& key : metadata.getMemberNames()) { SetMetadata(StringToEventGroupMetaKey(key), metadata[key].asString()); } } if (root.isMember("tags")) { Json::Value tags = root["tags"]; for (const auto& key : tags.getMemberNames()) { SetTag(key, tags[key].asString()); } } if (root.isMember("events")) { Json::Value events = root["events"]; for (const auto& event : events) { if (event["type"].asInt() == static_cast<int>(PipelineEvent::Type::LOG)) { AddLogEvent()->FromJson(event); } else if (event["type"].asInt() == static_cast<int>(PipelineEvent::Type::METRIC)) { AddMetricEvent()->FromJson(event); } else if (event["type"].asInt() == static_cast<int>(PipelineEvent::Type::SPAN)) { AddSpanEvent()->FromJson(event); } else { AddRawEvent()->FromJson(event); } } } return true; } string PipelineEventGroup::ToJsonString(bool enableEventMeta) const { Json::Value root = ToJson(enableEventMeta); Json::StreamWriterBuilder builder; builder["commentStyle"] = "None"; unique_ptr<Json::StreamWriter> writer(builder.newStreamWriter()); ostringstream oss; writer->write(root, &oss); return oss.str(); } bool PipelineEventGroup::FromJsonString(const string& inJson) { Json::CharReaderBuilder builder; builder["collectComments"] = false; unique_ptr<Json::CharReader> reader(builder.newCharReader()); string errs; Json::Value root; if (!reader->parse(inJson.data(), inJson.data() + inJson.size(), &root, &errs)) { LOG_ERROR(sLogger, ("build PipelineEventGroup FromJsonString error", errs)); return false; } return FromJson(root); } #endif } // namespace logtail