core/models/PipelineEventGroup.h (98 lines of code) (raw):
/*
* Copyright 2023 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <string>
#include "checkpoint/RangeCheckpoint.h"
#include "common/memory/SourceBuffer.h"
#include "constants/Constants.h"
#include "models/PipelineEventPtr.h"
namespace logtail {
class EventPool;
// references
// https://opentelemetry.io/docs/specs/otel/logs/data-model-appendix/#elastic-common-schema
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/README.md
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/logs.md
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/k8s.md
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/container.md
enum class EventGroupMetaKey {
UNKNOWN,
LOG_FILE_PATH_RESOLVED,
LOG_FORMAT,
LOG_FILE_OFFSET_KEY,
HAS_PART_LOG,
K8S_CLUSTER_ID,
K8S_NODE_NAME,
K8S_NODE_IP,
K8S_NAMESPACE,
K8S_POD_UID,
K8S_POD_NAME,
CONTAINER_NAME,
CONTAINER_IP,
CONTAINER_IMAGE_NAME,
CONTAINER_IMAGE_ID,
PROMETHEUS_SCRAPE_STATE,
PROMETHEUS_SCRAPE_DURATION,
PROMETHEUS_SCRAPE_RESPONSE_SIZE,
PROMETHEUS_SAMPLES_SCRAPED,
PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC,
PROMETHEUS_UP_STATE,
PROMETHEUS_STREAM_ID,
PROMETHEUS_STREAM_TOTAL,
INTERNAL_DATA_TARGET_REGION,
INTERNAL_DATA_TYPE,
SOURCE_ID
};
using GroupMetadata = std::map<EventGroupMetaKey, StringView>;
using GroupTags = std::map<StringView, StringView>;
// DeepCopy is required if we want to support no-linear topology
// We cannot just use default copy constructor as it won't deep copy PipelineEvent pointed in Events vector.
using EventsContainer = std::vector<PipelineEventPtr>;
// only movable
class PipelineEventGroup {
public:
PipelineEventGroup(const std::shared_ptr<SourceBuffer>& sourceBuffer) : mSourceBuffer(sourceBuffer) {}
~PipelineEventGroup();
PipelineEventGroup(PipelineEventGroup&&) noexcept;
PipelineEventGroup& operator=(PipelineEventGroup&&) noexcept;
PipelineEventGroup Copy() const;
std::unique_ptr<LogEvent> CreateLogEvent(bool fromPool = false, EventPool* pool = nullptr);
std::unique_ptr<MetricEvent> CreateMetricEvent(bool fromPool = false, EventPool* pool = nullptr);
std::unique_ptr<SpanEvent> CreateSpanEvent(bool fromPool = false, EventPool* pool = nullptr);
std::unique_ptr<RawEvent> CreateRawEvent(bool fromPool = false, EventPool* pool = nullptr);
const EventsContainer& GetEvents() const { return mEvents; }
EventsContainer& MutableEvents() { return mEvents; }
LogEvent* AddLogEvent(bool fromPool = false, EventPool* pool = nullptr);
MetricEvent* AddMetricEvent(bool fromPool = false, EventPool* pool = nullptr);
SpanEvent* AddSpanEvent(bool fromPool = false, EventPool* pool = nullptr);
RawEvent* AddRawEvent(bool fromPool = false, EventPool* pool = nullptr);
void SwapEvents(EventsContainer& other) { mEvents.swap(other); }
void ReserveEvents(size_t size) { mEvents.reserve(size); }
std::shared_ptr<SourceBuffer>& GetSourceBuffer() { return mSourceBuffer; }
void SetMetadata(EventGroupMetaKey key, StringView val);
void SetMetadata(EventGroupMetaKey key, const std::string& val);
void SetMetadataNoCopy(EventGroupMetaKey key, const StringBuffer& val);
StringView GetMetadata(EventGroupMetaKey key) const;
const GroupMetadata& GetAllMetadata() const { return mMetadata; };
bool HasMetadata(EventGroupMetaKey key) const;
void SetMetadataNoCopy(EventGroupMetaKey key, StringView val);
void DelMetadata(EventGroupMetaKey key);
void SetAllMetadata(const GroupMetadata& other) { mMetadata = other; }
void SetTag(StringView key, StringView val);
void SetTag(const std::string& key, const std::string& val);
void SetTag(const StringBuffer& key, StringView val);
void SetTagNoCopy(const StringBuffer& key, const StringBuffer& val);
StringView GetTag(StringView key) const;
const GroupTags& GetTags() const { return mTags.mInner; };
SizedMap& GetSizedTags() { return mTags; };
bool HasTag(StringView key) const;
void SetTagNoCopy(StringView key, StringView val);
void DelTag(StringView key);
size_t GetTagsHash() const;
void SetExactlyOnceCheckpoint(const RangeCheckpointPtr& checkpoint) { mExactlyOnceCheckpoint = checkpoint; }
RangeCheckpointPtr& GetExactlyOnceCheckpoint() { return mExactlyOnceCheckpoint; }
bool IsReplay() const;
size_t DataSize() const;
#ifdef APSARA_UNIT_TEST_MAIN
// for debug and test
Json::Value ToJson(bool enableEventMeta = false) const;
bool FromJson(const Json::Value&);
std::string ToJsonString(bool enableEventMeta = false) const;
bool FromJsonString(const std::string&);
#endif
private:
GroupMetadata mMetadata; // Used to generate tag/log. Will not output.
SizedMap mTags; // custom tags to output
EventsContainer mEvents;
std::shared_ptr<SourceBuffer> mSourceBuffer;
RangeCheckpointPtr mExactlyOnceCheckpoint;
};
} // namespace logtail