pluginmanager/plugin_wrapper_aggregator_v2.go (50 lines of code) (raw):
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pluginmanager
import (
"time"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
"github.com/alibaba/ilogtail/pkg/selfmonitor"
)
// AggregatorWrapperV2 wrappers Aggregator.
// It implements LogGroupQueue interface, and is passed to associated Aggregator.
// Aggregator uses Add function to pass log groups to wrapper, and then wrapper
// passes log groups to associated LogstoreConfig through channel LogGroupsChan.
// In fact, LogGroupsChan == (associated) LogstoreConfig.LogGroupsChan.
type AggregatorWrapperV2 struct {
AggregatorWrapper
Aggregator pipeline.AggregatorV2
totalDelayTimeMs selfmonitor.CounterMetric
}
func (wrapper *AggregatorWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error {
wrapper.InitMetricRecord(pluginMeta)
wrapper.totalDelayTimeMs = selfmonitor.NewCounterMetricAndRegister(wrapper.MetricRecord, selfmonitor.MetricPluginTotalDelayMs)
interval, err := wrapper.Aggregator.Init(wrapper.Config.Context, wrapper)
if err != nil {
logger.Error(wrapper.Config.Context.GetRuntimeContext(), "AGGREGATOR_INIT_ERROR", "Aggregator failed to initialize", wrapper.Aggregator.Description(), "error", err)
return err
}
if interval == 0 {
interval = wrapper.Config.GlobalConfig.AggregatIntervalMs
}
wrapper.Interval = time.Millisecond * time.Duration(interval)
return nil
}
func (wrapper *AggregatorWrapperV2) Record(events *models.PipelineGroupEvents, context pipeline.PipelineContext) error {
startTime := time.Now()
err := wrapper.Aggregator.Record(events, context)
if err == nil {
wrapper.outEventsTotal.Add(int64(len(events.Events)))
wrapper.outEventGroupsTotal.Add(1)
for _, event := range events.Events {
wrapper.outSizeBytes.Add(event.GetSize())
}
}
wrapper.totalDelayTimeMs.Add(time.Since(startTime).Milliseconds())
return err
}
func (wrapper *AggregatorWrapperV2) GetResult(context pipeline.PipelineContext) error {
return wrapper.Aggregator.GetResult(context)
}
func (wrapper *AggregatorWrapperV2) Add(loggroup *protocol.LogGroup) error {
return nil
}
func (wrapper *AggregatorWrapperV2) AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error {
return nil
}