pluginmanager/plugin_wrapper.go (82 lines of code) (raw):

// Copyright 2024 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pluginmanager import ( "time" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/selfmonitor" ) /*--------------------- Plugin Input The input plugin is used for reading data. ---------------------*/ type InputWrapper struct { pipeline.PluginContext Config *LogstoreConfig Tags map[string]string Interval time.Duration outEventsTotal selfmonitor.CounterMetric outEventGroupsTotal selfmonitor.CounterMetric outSizeBytes selfmonitor.CounterMetric } func (wrapper *InputWrapper) InitMetricRecord(pluginMeta *pipeline.PluginMeta) { labels := pipeline.GetPluginCommonLabels(wrapper.Config.Context, pluginMeta) wrapper.MetricRecord = wrapper.Config.Context.RegisterMetricRecord(labels) wrapper.outEventsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginOutEventsTotal) wrapper.outEventGroupsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginOutEventGroupsTotal) wrapper.outSizeBytes = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginOutSizeBytes) } // The service plugin is an input plugin used for passively receiving data. type ServiceWrapper struct { InputWrapper } // metric plugin is an input plugin used for actively pulling data. type MetricWrapper struct { InputWrapper } /*--------------------- Plugin Processor The processor plugin is used for reading data. ---------------------*/ type ProcessorWrapper struct { pipeline.PluginContext Config *LogstoreConfig inEventsTotal selfmonitor.CounterMetric inSizeBytes selfmonitor.CounterMetric outEventsTotal selfmonitor.CounterMetric outSizeBytes selfmonitor.CounterMetric totalProcessTimeMs selfmonitor.CounterMetric } func (wrapper *ProcessorWrapper) InitMetricRecord(pluginMeta *pipeline.PluginMeta) { labels := pipeline.GetPluginCommonLabels(wrapper.Config.Context, pluginMeta) wrapper.MetricRecord = wrapper.Config.Context.RegisterMetricRecord(labels) wrapper.inEventsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginInEventsTotal) wrapper.inSizeBytes = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginInSizeBytes) wrapper.outEventsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginOutEventsTotal) wrapper.outSizeBytes = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginOutSizeBytes) wrapper.totalProcessTimeMs = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginTotalProcessTimeMs) } /*--------------------- Plugin Aggregator The aggregator plugin is used for aggregating data. ---------------------*/ type AggregatorWrapper struct { pipeline.PluginContext Config *LogstoreConfig Interval time.Duration outEventsTotal selfmonitor.CounterMetric outEventGroupsTotal selfmonitor.CounterMetric outSizeBytes selfmonitor.CounterMetric } func (wrapper *AggregatorWrapper) InitMetricRecord(pluginMeta *pipeline.PluginMeta) { labels := pipeline.GetPluginCommonLabels(wrapper.Config.Context, pluginMeta) wrapper.MetricRecord = wrapper.Config.Context.RegisterMetricRecord(labels) wrapper.outEventsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginOutEventsTotal) wrapper.outEventGroupsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginOutEventGroupsTotal) wrapper.outSizeBytes = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginOutSizeBytes) } /*--------------------- Plugin Flusher The flusher plugin is used for sending data. ---------------------*/ type FlusherWrapperInterface interface { Init(pluginMeta *pipeline.PluginMeta) error IsReady(projectName string, logstoreName string, logstoreKey int64) bool } type FlusherWrapper struct { pipeline.PluginContext Config *LogstoreConfig Interval time.Duration inEventsTotal selfmonitor.CounterMetric inEventGroupsTotal selfmonitor.CounterMetric inSizeBytes selfmonitor.CounterMetric totalDelayTimeMs selfmonitor.CounterMetric } func (wrapper *FlusherWrapper) InitMetricRecord(pluginMeta *pipeline.PluginMeta) { labels := pipeline.GetPluginCommonLabels(wrapper.Config.Context, pluginMeta) wrapper.MetricRecord = wrapper.Config.Context.RegisterMetricRecord(labels) wrapper.inEventsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginInEventsTotal) wrapper.inEventGroupsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginInEventGroupsTotal) wrapper.inSizeBytes = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginInSizeBytes) wrapper.totalDelayTimeMs = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginTotalDelayMs) }