pluginmanager/plugin_runner_v1.go (390 lines of code) (raw):
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pluginmanager
import (
"time"
"github.com/alibaba/ilogtail/pkg/flags"
"github.com/alibaba/ilogtail/pkg/helper/math"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
"github.com/alibaba/ilogtail/pkg/util"
)
type pluginv1Runner struct {
// pipeline v1 fields
LogsChan chan *pipeline.LogWithContext
LogGroupsChan chan *protocol.LogGroup
MetricPlugins []*MetricWrapperV1
ServicePlugins []*ServiceWrapperV1
ProcessorPlugins []*ProcessorWrapperV1
AggregatorPlugins []*AggregatorWrapperV1
FlusherPlugins []*FlusherWrapperV1
ExtensionPlugins map[string]pipeline.Extension
FlushOutStore *FlushOutStore[protocol.LogGroup]
LogstoreConfig *LogstoreConfig
InputControl *pipeline.AsyncControl
ProcessControl *pipeline.AsyncControl
AggregateControl *pipeline.AsyncControl
FlushControl *pipeline.AsyncControl
}
func (p *pluginv1Runner) Init(inputQueueSize int, flushQueueSize int) error {
p.InputControl = pipeline.NewAsyncControl()
p.ProcessControl = pipeline.NewAsyncControl()
p.AggregateControl = pipeline.NewAsyncControl()
p.FlushControl = pipeline.NewAsyncControl()
p.MetricPlugins = make([]*MetricWrapperV1, 0)
p.ServicePlugins = make([]*ServiceWrapperV1, 0)
p.ProcessorPlugins = make([]*ProcessorWrapperV1, 0)
p.AggregatorPlugins = make([]*AggregatorWrapperV1, 0)
p.FlusherPlugins = make([]*FlusherWrapperV1, 0)
p.ExtensionPlugins = make(map[string]pipeline.Extension, 0)
p.LogsChan = make(chan *pipeline.LogWithContext, inputQueueSize)
p.LogGroupsChan = make(chan *protocol.LogGroup, math.Max(flushQueueSize, p.FlushOutStore.Len()))
p.FlushOutStore.Write(p.LogGroupsChan)
return nil
}
func (p *pluginv1Runner) AddDefaultAggregatorIfEmpty() error {
if len(p.AggregatorPlugins) == 0 {
pluginMeta := p.LogstoreConfig.genPluginMeta("aggregator_default")
logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "add default aggregator")
if err := loadAggregator(pluginMeta, p.LogstoreConfig, nil); err != nil {
return err
}
}
return nil
}
func (p *pluginv1Runner) AddDefaultFlusherIfEmpty() error {
if len(p.FlusherPlugins) == 0 {
logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "add default flusher")
category, options := flags.GetFlusherConfiguration()
pluginMeta := p.LogstoreConfig.genPluginMeta(category)
if err := loadFlusher(pluginMeta, p.LogstoreConfig, options); err != nil {
return err
}
}
return nil
}
func (p *pluginv1Runner) AddPlugin(pluginMeta *pipeline.PluginMeta, category pluginCategory, plugin interface{}, config map[string]interface{}) error {
switch category {
case pluginMetricInput:
if metric, ok := plugin.(pipeline.MetricInputV1); ok {
return p.addMetricInput(pluginMeta, metric, config["interval"].(int))
}
case pluginServiceInput:
if service, ok := plugin.(pipeline.ServiceInputV1); ok {
return p.addServiceInput(pluginMeta, service)
}
case pluginProcessor:
if processor, ok := plugin.(pipeline.ProcessorV1); ok {
return p.addProcessor(pluginMeta, processor, config["priority"].(int))
}
case pluginAggregator:
if aggregator, ok := plugin.(pipeline.AggregatorV1); ok {
return p.addAggregator(pluginMeta, aggregator)
}
case pluginFlusher:
if flusher, ok := plugin.(pipeline.FlusherV1); ok {
return p.addFlusher(pluginMeta, flusher)
}
case pluginExtension:
if extension, ok := plugin.(pipeline.Extension); ok {
return p.addExtension(pluginMeta.PluginTypeWithID, extension)
}
default:
return pluginCategoryUndefinedError(category)
}
return pluginUnImplementError(category, v1, pluginMeta.PluginTypeWithID)
}
func (p *pluginv1Runner) GetExtension(name string) (pipeline.Extension, bool) {
extension, ok := p.ExtensionPlugins[name]
return extension, ok
}
func (p *pluginv1Runner) Run() {
p.runFlusher()
p.runAggregator()
p.runProcessor()
p.runInput()
}
func (p *pluginv1Runner) RunPlugins(category pluginCategory, control *pipeline.AsyncControl) {
switch category {
case pluginMetricInput:
p.runMetricInput(control)
default:
}
}
func (p *pluginv1Runner) IsWithInputPlugin() bool {
return len(p.MetricPlugins) > 0 || len(p.ServicePlugins) > 0
}
func (p *pluginv1Runner) addMetricInput(pluginMeta *pipeline.PluginMeta, input pipeline.MetricInputV1, inputInterval int) error {
var wrapper MetricWrapperV1
wrapper.Config = p.LogstoreConfig
wrapper.Input = input
wrapper.LogsChan = p.LogsChan
p.MetricPlugins = append(p.MetricPlugins, &wrapper)
return wrapper.Init(pluginMeta, inputInterval)
}
func (p *pluginv1Runner) addServiceInput(pluginMeta *pipeline.PluginMeta, input pipeline.ServiceInputV1) error {
var wrapper ServiceWrapperV1
wrapper.Config = p.LogstoreConfig
wrapper.Input = input
wrapper.LogsChan = p.LogsChan
p.ServicePlugins = append(p.ServicePlugins, &wrapper)
return wrapper.Init(pluginMeta)
}
func (p *pluginv1Runner) addProcessor(pluginMeta *pipeline.PluginMeta, processor pipeline.ProcessorV1, priority int) error {
var wrapper ProcessorWrapperV1
wrapper.Config = p.LogstoreConfig
wrapper.Processor = processor
wrapper.LogsChan = p.LogsChan
wrapper.Priority = priority
p.ProcessorPlugins = append(p.ProcessorPlugins, &wrapper)
return wrapper.Init(pluginMeta)
}
func (p *pluginv1Runner) addAggregator(pluginMeta *pipeline.PluginMeta, aggregator pipeline.AggregatorV1) error {
var wrapper AggregatorWrapperV1
wrapper.Config = p.LogstoreConfig
wrapper.Aggregator = aggregator
wrapper.LogGroupsChan = p.LogGroupsChan
p.AggregatorPlugins = append(p.AggregatorPlugins, &wrapper)
return wrapper.Init(pluginMeta)
}
func (p *pluginv1Runner) addFlusher(pluginMeta *pipeline.PluginMeta, flusher pipeline.FlusherV1) error {
var wrapper FlusherWrapperV1
wrapper.Config = p.LogstoreConfig
wrapper.Flusher = flusher
wrapper.LogGroupsChan = p.LogGroupsChan
wrapper.Interval = time.Millisecond * time.Duration(p.LogstoreConfig.GlobalConfig.FlushIntervalMs)
p.FlusherPlugins = append(p.FlusherPlugins, &wrapper)
return wrapper.Init(pluginMeta)
}
func (p *pluginv1Runner) addExtension(name string, extension pipeline.Extension) error {
p.ExtensionPlugins[name] = extension
return nil
}
func (p *pluginv1Runner) runInput() {
p.InputControl.Reset()
p.runMetricInput(p.InputControl)
for _, service := range p.ServicePlugins {
s := service
p.InputControl.Run(s.Run)
}
}
func (p *pluginv1Runner) runMetricInput(async *pipeline.AsyncControl) {
for _, metric := range p.MetricPlugins {
m := metric
runner := &timerRunner{
initialMaxDelay: time.Duration(p.LogstoreConfig.GlobalConfig.InputMaxFirstCollectDelayMs) * time.Millisecond,
state: m.Input,
interval: m.Interval,
context: m.Config.Context,
}
async.Run(func(ac *pipeline.AsyncControl) {
runner.Run(func(state interface{}) error {
return m.Input.Collect(m)
}, ac)
})
}
}
func (p *pluginv1Runner) runProcessor() {
p.ProcessControl.Reset()
p.ProcessControl.Run(p.runProcessorInternal)
}
// runProcessorInternal is the routine of processors.
// Each LogstoreConfig has its own goroutine for this routine.
// When log is ready (passed through LogsChan), we will try to get
//
// all available logs from the channel, and pass them together to processors.
//
// All processors of the config share same gogroutine, logs are passed to them
//
// one by one, just like logs -> p1 -> p2 -> p3 -> logsGoToNextStep.
//
// It returns when processShutdown is closed.
func (p *pluginv1Runner) runProcessorInternal(cc *pipeline.AsyncControl) {
defer panicRecover(p.LogstoreConfig.ConfigName)
var logCtx *pipeline.LogWithContext
var processorTag *ProcessorTag
if globalConfig := p.LogstoreConfig.GlobalConfig; globalConfig.EnableProcessorTag {
processorTag = NewProcessorTag(globalConfig.PipelineMetaTagKey, globalConfig.AppendingAllEnvMetaTag, globalConfig.AgentEnvMetaTagKey)
}
for {
select {
case <-cc.CancelToken():
if len(p.LogsChan) == 0 {
return
}
case logCtx = <-p.LogsChan:
if processorTag != nil {
processorTag.ProcessV1(logCtx)
}
logs := []*protocol.Log{logCtx.Log}
for _, processor := range p.ProcessorPlugins {
logs = processor.Process(logs)
if len(logs) == 0 {
break
}
}
nowTime := time.Now()
if len(logs) > 0 {
for _, aggregator := range p.AggregatorPlugins {
for _, l := range logs {
if len(l.Contents) == 0 {
continue
}
if l.Time == uint32(0) {
protocol.SetLogTime(l, uint32(nowTime.Unix()))
}
if !p.LogstoreConfig.GlobalConfig.EnableTimestampNanosecond {
l.TimeNs = nil
}
for tryCount := 1; true; tryCount++ {
err := aggregator.Aggregator.Add(l, logCtx.Context)
if err == nil {
break
}
// wait until shutdown is active
if tryCount%100 == 0 {
logger.Warning(p.LogstoreConfig.Context.GetRuntimeContext(), "AGGREGATOR_ADD_ALARM", "error", err)
}
time.Sleep(time.Millisecond * 10)
}
}
}
}
}
}
}
func (p *pluginv1Runner) runAggregator() {
p.AggregateControl.Reset()
for _, aggregator := range p.AggregatorPlugins {
a := aggregator
p.AggregateControl.Run(a.Run)
}
}
func (p *pluginv1Runner) runFlusher() {
p.FlushControl.Reset()
p.FlushControl.Run(p.runFlusherInternal)
}
func (p *pluginv1Runner) runFlusherInternal(cc *pipeline.AsyncControl) {
defer panicRecover(p.LogstoreConfig.ConfigName)
var logGroup *protocol.LogGroup
for {
select {
case <-cc.CancelToken():
if len(p.LogGroupsChan) == 0 {
return
}
case logGroup = <-p.LogGroupsChan:
if logGroup == nil {
continue
}
listLen := len(p.LogGroupsChan) + 1
logGroups := make([]*protocol.LogGroup, listLen)
logGroups[0] = logGroup
for i := 1; i < listLen; i++ {
logGroups[i] = <-p.LogGroupsChan
}
for _, logGroup := range logGroups {
if len(logGroup.Logs) == 0 {
continue
}
logGroup.Source = util.GetIPAddress()
}
// Flush LogGroups to all flushers.
// Note: multiple flushers is unrecommended, because all flushers will
// be blocked if one of them is unready.
for {
allReady := true
for _, flusher := range p.FlusherPlugins {
if !flusher.Flusher.IsReady(p.LogstoreConfig.ProjectName,
p.LogstoreConfig.LogstoreName, p.LogstoreConfig.LogstoreKey) {
allReady = false
break
}
}
if allReady {
for _, flusher := range p.FlusherPlugins {
err := flusher.Flush(p.LogstoreConfig.ProjectName,
p.LogstoreConfig.LogstoreName, p.LogstoreConfig.ConfigName, logGroups)
if err != nil {
logger.Error(p.LogstoreConfig.Context.GetRuntimeContext(), "FLUSH_DATA_ALARM", "flush data error",
p.LogstoreConfig.ProjectName, p.LogstoreConfig.LogstoreName, err)
}
}
break
}
if !p.LogstoreConfig.FlushOutFlag.Load() {
time.Sleep(time.Duration(10) * time.Millisecond)
continue
}
// Config is stopping, move unflushed LogGroups to FlushOutLogGroups.
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "flush loggroup to slice, loggroup count", listLen)
p.FlushOutStore.Add(logGroups...)
break
}
}
}
}
func (p *pluginv1Runner) Stop(exit bool) error {
for _, flusher := range p.FlusherPlugins {
flusher.Flusher.SetUrgent(exit)
}
p.LogstoreConfig.FlushOutFlag.Store(true)
for _, service := range p.ServicePlugins {
_ = service.Stop()
}
p.InputControl.WaitCancel()
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "metric plugins stop", "done", "service plugins stop", "done")
p.ProcessControl.WaitCancel()
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "processor plugins stop", "done")
p.AggregateControl.WaitCancel()
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done")
p.FlushControl.WaitCancel()
if exit && p.FlushOutStore.Len() > 0 {
flushers := make([]pipeline.FlusherV1, len(p.FlusherPlugins))
for idx, flusher := range p.FlusherPlugins {
flushers[idx] = flusher.Flusher
}
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "flushout loggroups, count", p.FlushOutStore.Len())
rst := flushOutStore(p.LogstoreConfig, p.FlushOutStore, p.FlusherPlugins, func(lc *LogstoreConfig, sf *FlusherWrapperV1, store *FlushOutStore[protocol.LogGroup]) error {
return sf.Flusher.Flush(lc.Context.GetProject(), lc.Context.GetLogstore(), lc.Context.GetConfigName(), store.Get())
})
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "flushout loggroups, result", rst)
}
for idx, flusher := range p.FlusherPlugins {
if err := flusher.Flusher.Stop(); err != nil {
logger.Warningf(p.LogstoreConfig.Context.GetRuntimeContext(), "STOP_FLUSHER_ALARM",
"Failed to stop %vth flusher (description: %v): %v",
idx, flusher.Flusher.Description(), err)
}
}
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "flusher plugins stop", "done")
for _, extension := range p.ExtensionPlugins {
err := extension.Stop()
if err != nil {
logger.Warningf(p.LogstoreConfig.Context.GetRuntimeContext(), "STOP_EXTENSION_ALARM",
"failed to stop extension (description: %v): %v", extension.Description(), err)
}
}
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "extension plugins stop", "done")
return nil
}
func (p *pluginv1Runner) ReceiveRawLog(log *pipeline.LogWithContext) {
p.LogsChan <- log
}
func (p *pluginv1Runner) ReceiveLogGroup(logGroup pipeline.LogGroupWithContext) {
topic := logGroup.LogGroup.GetTopic()
for _, log := range logGroup.LogGroup.Logs {
if len(topic) > 0 {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: tagKeyLogTopic, Value: topic})
}
// When UsingOldContentTag is set to false, the tag is now put into the context during cgo.
if !p.LogstoreConfig.GlobalConfig.UsingOldContentTag {
context := map[string]interface{}{}
for key, value := range logGroup.Context {
context[key] = value
}
context[ctxKeyTopic] = topic
context[ctxKeyTags] = logGroup.LogGroup.LogTags
p.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: context})
} else {
context := map[string]interface{}{}
for key, value := range logGroup.Context {
context[key] = value
}
context[ctxKeyTopic] = topic
for _, tag := range logGroup.LogGroup.LogTags {
log.Contents = append(log.Contents, &protocol.Log_Content{
Key: tagPrefix + tag.GetKey(),
Value: tag.GetValue(),
})
}
p.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: context})
}
}
}
func (p *pluginv1Runner) Merge(r PluginRunner) {
if other, ok := r.(*pluginv1Runner); ok {
p.FlushOutStore.Merge(other.FlushOutStore)
}
}