pluginmanager/plugin_wrapper_processor_v2.go (34 lines of code) (raw):
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pluginmanager
import (
"time"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/selfmonitor"
)
type ProcessorWrapperV2 struct {
ProcessorWrapper
Processor pipeline.ProcessorV2
inEventGroupsTotal selfmonitor.CounterMetric
outEventGroupsTotal selfmonitor.CounterMetric
}
func (wrapper *ProcessorWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error {
wrapper.InitMetricRecord(pluginMeta)
wrapper.inEventGroupsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginInEventGroupsTotal)
wrapper.outEventGroupsTotal = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginOutEventGroupsTotal)
return wrapper.Processor.Init(wrapper.Config.Context)
}
func (wrapper *ProcessorWrapperV2) Process(in *models.PipelineGroupEvents, context pipeline.PipelineContext) {
startTime := time.Now().UnixMilli()
wrapper.inEventGroupsTotal.Add(1)
wrapper.inEventsTotal.Add(int64(len(in.Events)))
for _, event := range in.Events {
wrapper.inSizeBytes.Add(event.GetSize())
}
wrapper.Processor.Process(in, context)
wrapper.outEventGroupsTotal.Add(1)
wrapper.outEventsTotal.Add(int64(len(in.Events)))
for _, event := range in.Events {
wrapper.outSizeBytes.Add(event.GetSize())
}
wrapper.totalProcessTimeMs.Add(time.Now().UnixMilli() - startTime)
}