pluginmanager/plugin_runner_v2.go (457 lines of code) (raw):

// Copyright 2022 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pluginmanager import ( "strconv" "strings" "time" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/util" ) var ( tagPrefix = "__tag__:" fileOffsetKey = tagPrefix + "__file_offset__" contentKey = "content" ) type pluginv2Runner struct { // pipeline v2 fields InputPipeContext pipeline.PipelineContext ProcessPipeContext pipeline.PipelineContext AggregatePipeContext pipeline.PipelineContext FlushPipeContext pipeline.PipelineContext InputControl *pipeline.AsyncControl ProcessControl *pipeline.AsyncControl AggregateControl *pipeline.AsyncControl FlushControl *pipeline.AsyncControl MetricPlugins []*MetricWrapperV2 ServicePlugins []*ServiceWrapperV2 ProcessorPlugins []*ProcessorWrapperV2 AggregatorPlugins []*AggregatorWrapperV2 FlusherPlugins []*FlusherWrapperV2 ExtensionPlugins map[string]pipeline.Extension TimerRunner []*timerRunner FlushOutStore *FlushOutStore[models.PipelineGroupEvents] LogstoreConfig *LogstoreConfig } func (p *pluginv2Runner) Init(inputQueueSize int, flushQueueSize int) error { p.InputControl = pipeline.NewAsyncControl() p.ProcessControl = pipeline.NewAsyncControl() p.AggregateControl = pipeline.NewAsyncControl() p.FlushControl = pipeline.NewAsyncControl() p.MetricPlugins = make([]*MetricWrapperV2, 0) p.ServicePlugins = make([]*ServiceWrapperV2, 0) p.ProcessorPlugins = make([]*ProcessorWrapperV2, 0) p.AggregatorPlugins = make([]*AggregatorWrapperV2, 0) p.FlusherPlugins = make([]*FlusherWrapperV2, 0) p.ExtensionPlugins = make(map[string]pipeline.Extension, 0) p.InputPipeContext = helper.NewObservePipelineContext(inputQueueSize) p.ProcessPipeContext = helper.NewGroupedPipelineContext() p.AggregatePipeContext = helper.NewObservePipelineContext(flushQueueSize) p.FlushPipeContext = helper.NewNoopPipelineContext() p.FlushOutStore.Write(p.AggregatePipeContext.Collector().Observe()) return nil } func (p *pluginv2Runner) AddDefaultAggregatorIfEmpty() error { if len(p.AggregatorPlugins) == 0 { pluginMeta := p.LogstoreConfig.genPluginMeta("aggregator_default") logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "add default aggregator") if err := loadAggregator(pluginMeta, p.LogstoreConfig, nil); err != nil { return err } } return nil } func (p *pluginv2Runner) AddDefaultFlusherIfEmpty() error { return nil } func (p *pluginv2Runner) AddPlugin(pluginMeta *pipeline.PluginMeta, category pluginCategory, plugin interface{}, config map[string]interface{}) error { switch category { case pluginMetricInput: if metric, ok := plugin.(pipeline.MetricInputV2); ok { return p.addMetricInput(pluginMeta, metric, config["interval"].(int)) } case pluginServiceInput: if service, ok := plugin.(pipeline.ServiceInputV2); ok { return p.addServiceInput(pluginMeta, service) } case pluginProcessor: if processor, ok := plugin.(pipeline.ProcessorV2); ok { return p.addProcessor(pluginMeta, processor, config["priority"].(int)) } case pluginAggregator: if aggregator, ok := plugin.(pipeline.AggregatorV2); ok { return p.addAggregator(pluginMeta, aggregator) } case pluginFlusher: if flusher, ok := plugin.(pipeline.FlusherV2); ok { return p.addFlusher(pluginMeta, flusher) } case pluginExtension: if extension, ok := plugin.(pipeline.Extension); ok { return p.addExtension(pluginMeta.PluginTypeWithID, extension) } default: return pluginCategoryUndefinedError(category) } return pluginUnImplementError(category, v2, pluginMeta.PluginTypeWithID) } func (p *pluginv2Runner) GetExtension(name string) (pipeline.Extension, bool) { extension, ok := p.ExtensionPlugins[name] return extension, ok } func (p *pluginv2Runner) Run() { p.runFlusher() p.runAggregator() p.runProcessor() p.runInput() } func (p *pluginv2Runner) RunPlugins(category pluginCategory, control *pipeline.AsyncControl) { switch category { case pluginMetricInput: p.runMetricInput(control) default: } } func (p *pluginv2Runner) IsWithInputPlugin() bool { return len(p.MetricPlugins) > 0 || len(p.ServicePlugins) > 0 } func (p *pluginv2Runner) addMetricInput(pluginMeta *pipeline.PluginMeta, input pipeline.MetricInputV2, inputInterval int) error { var wrapper MetricWrapperV2 wrapper.Config = p.LogstoreConfig wrapper.Input = input err := wrapper.Init(pluginMeta, inputInterval) if err != nil { return err } p.MetricPlugins = append(p.MetricPlugins, &wrapper) p.TimerRunner = append(p.TimerRunner, &timerRunner{ initialMaxDelay: time.Duration(p.LogstoreConfig.GlobalConfig.InputMaxFirstCollectDelayMs) * time.Millisecond, interval: wrapper.Interval, state: input, context: p.LogstoreConfig.Context, }) return err } func (p *pluginv2Runner) addServiceInput(pluginMeta *pipeline.PluginMeta, input pipeline.ServiceInputV2) error { var wrapper ServiceWrapperV2 wrapper.Config = p.LogstoreConfig wrapper.Input = input p.ServicePlugins = append(p.ServicePlugins, &wrapper) err := wrapper.Init(pluginMeta) return err } func (p *pluginv2Runner) addProcessor(pluginMeta *pipeline.PluginMeta, processor pipeline.ProcessorV2, _ int) error { var wrapper ProcessorWrapperV2 wrapper.Config = p.LogstoreConfig wrapper.Processor = processor p.ProcessorPlugins = append(p.ProcessorPlugins, &wrapper) return wrapper.Init(pluginMeta) } func (p *pluginv2Runner) addAggregator(pluginMeta *pipeline.PluginMeta, aggregator pipeline.AggregatorV2) error { var wrapper AggregatorWrapperV2 wrapper.Config = p.LogstoreConfig wrapper.Aggregator = aggregator err := wrapper.Init(pluginMeta) if err != nil { logger.Error(p.LogstoreConfig.Context.GetRuntimeContext(), "AGGREGATOR_INIT_ERROR", "Aggregator failed to initialize", aggregator.Description(), "error", err) return err } p.AggregatorPlugins = append(p.AggregatorPlugins, &wrapper) p.TimerRunner = append(p.TimerRunner, &timerRunner{ state: aggregator, initialMaxDelay: wrapper.Interval, interval: wrapper.Interval, context: p.LogstoreConfig.Context, }) return nil } func (p *pluginv2Runner) addFlusher(pluginMeta *pipeline.PluginMeta, flusher pipeline.FlusherV2) error { var wrapper FlusherWrapperV2 wrapper.Config = p.LogstoreConfig wrapper.Flusher = flusher wrapper.Interval = time.Millisecond * time.Duration(p.LogstoreConfig.GlobalConfig.FlushIntervalMs) p.FlusherPlugins = append(p.FlusherPlugins, &wrapper) return wrapper.Init(pluginMeta) } func (p *pluginv2Runner) addExtension(name string, extension pipeline.Extension) error { p.ExtensionPlugins[name] = extension return nil } func (p *pluginv2Runner) runInput() { p.InputControl.Reset() p.runMetricInput(p.InputControl) for _, input := range p.ServicePlugins { service := input p.InputControl.Run(func(c *pipeline.AsyncControl) { logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "start run service", service) defer panicRecover(service.Input.Description()) if err := service.StartService(p.InputPipeContext); err != nil { logger.Error(p.LogstoreConfig.Context.GetRuntimeContext(), "PLUGIN_ALARM", "start service error, err", err) } logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "service done", service.Input.Description()) }) } } func (p *pluginv2Runner) runMetricInput(control *pipeline.AsyncControl) { for _, t := range p.TimerRunner { if plugin, ok := t.state.(pipeline.MetricInputV2); ok { metric := plugin timer := t control.Run(func(cc *pipeline.AsyncControl) { timer.Run(func(state interface{}) error { return metric.Read(p.InputPipeContext) }, cc) }) } else { logger.Error(p.LogstoreConfig.Context.GetRuntimeContext(), "METRIC_INPUT_V2_START_FAILURE", "type assertion", "failure") } } } func (p *pluginv2Runner) runProcessor() { p.ProcessControl.Reset() p.ProcessControl.Run(p.runProcessorInternal) } func (p *pluginv2Runner) runProcessorInternal(cc *pipeline.AsyncControl) { defer panicRecover(p.LogstoreConfig.ConfigName) pipeContext := p.ProcessPipeContext pipeChan := p.InputPipeContext.Collector().Observe() var processorTag *ProcessorTag if globalConfig := p.LogstoreConfig.GlobalConfig; globalConfig.EnableProcessorTag { processorTag = NewProcessorTag(globalConfig.PipelineMetaTagKey, globalConfig.AppendingAllEnvMetaTag, globalConfig.AgentEnvMetaTagKey) } for { select { case <-cc.CancelToken(): if len(pipeChan) == 0 { return } case group := <-pipeChan: if processorTag != nil { processorTag.ProcessV2(group) } pipeEvents := []*models.PipelineGroupEvents{group} for _, processor := range p.ProcessorPlugins { for _, in := range pipeEvents { processor.Process(in, pipeContext) } pipeEvents = pipeContext.Collector().ToArray() if len(pipeEvents) == 0 { break } } if len(pipeEvents) == 0 { break } for _, aggregator := range p.AggregatorPlugins { for _, pipeEvent := range pipeEvents { if len(pipeEvent.Events) == 0 { continue } for tryCount := 1; true; tryCount++ { err := aggregator.Record(pipeEvent, p.AggregatePipeContext) if err == nil { break } // wait until shutdown is active if tryCount%100 == 0 { logger.Warning(p.LogstoreConfig.Context.GetRuntimeContext(), "AGGREGATOR_ADD_ALARM", "error", err) } time.Sleep(time.Millisecond * 10) } } } } } } func (p *pluginv2Runner) runAggregator() { p.AggregateControl.Reset() for _, t := range p.TimerRunner { if plugin, ok := t.state.(pipeline.AggregatorV2); ok { aggregator := plugin timer := t p.AggregateControl.Run(func(cc *pipeline.AsyncControl) { timer.Run(func(state interface{}) error { return aggregator.GetResult(p.AggregatePipeContext) }, cc) }) } } } func (p *pluginv2Runner) runFlusher() { p.FlushControl.Reset() p.FlushControl.Run(p.runFlusherInternal) } func (p *pluginv2Runner) runFlusherInternal(cc *pipeline.AsyncControl) { defer panicRecover(p.LogstoreConfig.ConfigName) pipeChan := p.AggregatePipeContext.Collector().Observe() for { select { case <-cc.CancelToken(): if len(pipeChan) == 0 { return } case event := <-pipeChan: if event == nil { continue } dataSize := len(pipeChan) + 1 data := make([]*models.PipelineGroupEvents, dataSize) data[0] = event for i := 1; i < dataSize; i++ { data[i] = <-pipeChan } // Flush LogGroups to all flushers. // Note: multiple flushers is unrecommended, because all flushers will // be blocked if one of them is unready. for { allReady := true for _, flusher := range p.FlusherPlugins { if !flusher.Flusher.IsReady(p.LogstoreConfig.ProjectName, p.LogstoreConfig.LogstoreName, p.LogstoreConfig.LogstoreKey) { allReady = false break } } if allReady { for _, flusher := range p.FlusherPlugins { err := flusher.Export(data, p.FlushPipeContext) if err != nil { logger.Error(p.LogstoreConfig.Context.GetRuntimeContext(), "FLUSH_DATA_ALARM", "flush data error", p.LogstoreConfig.ProjectName, p.LogstoreConfig.LogstoreName, err) } } break } if !p.LogstoreConfig.FlushOutFlag.Load() { time.Sleep(time.Duration(10) * time.Millisecond) continue } // Config is stopping, move unflushed LogGroups to FlushOutLogGroups. logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "flush loggroup to slice, loggroup count", dataSize) p.FlushOutStore.Add(data...) break } } } } func (p *pluginv2Runner) Stop(exit bool) error { for _, flusher := range p.FlusherPlugins { flusher.Flusher.SetUrgent(exit) } p.LogstoreConfig.FlushOutFlag.Store(true) for _, serviceInput := range p.ServicePlugins { _ = serviceInput.Input.Stop() } p.InputControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "metric plugins stop", "done", "service plugins stop", "done") p.ProcessControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "processor plugins stop", "done") p.AggregateControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done") p.FlushControl.WaitCancel() if exit && p.FlushOutStore.Len() > 0 { flushers := make([]pipeline.FlusherV2, len(p.FlusherPlugins)) for idx, flusher := range p.FlusherPlugins { flushers[idx] = flusher.Flusher } logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "Flushout group events, count", p.FlushOutStore.Len()) rst := flushOutStore(p.LogstoreConfig, p.FlushOutStore, p.FlusherPlugins, func(lc *LogstoreConfig, pf *FlusherWrapperV2, store *FlushOutStore[models.PipelineGroupEvents]) error { return pf.Export(store.Get(), p.FlushPipeContext) }) logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "Flushout group events, result", rst) } for idx, flusher := range p.FlusherPlugins { if err := flusher.Flusher.Stop(); err != nil { logger.Warningf(p.LogstoreConfig.Context.GetRuntimeContext(), "STOP_FLUSHER_ALARM", "Failed to stop %vth flusher (description: %v): %v", idx, flusher.Flusher.Description(), err) } } logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "Flusher plugins stop", "done") for _, extension := range p.ExtensionPlugins { err := extension.Stop() if err != nil { logger.Warningf(p.LogstoreConfig.Context.GetRuntimeContext(), "STOP_EXTENSION_ALARM", "failed to stop extension (description: %v): %v", extension.Description(), err) } } logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "extension plugins stop", "done") return nil } func (p *pluginv2Runner) ReceiveLogGroup(in pipeline.LogGroupWithContext) { topic := in.LogGroup.GetTopic() meta := models.NewMetadata() if in.Context != nil { for k, v := range in.Context { value, ok := v.(string) if !ok { logger.Warningf(p.LogstoreConfig.Context.GetRuntimeContext(), "RECEIVE_LOG_GROUP_ALARM", "unknown values found in context, type is %T", v) continue } meta.Add(k, value) } } meta.Add(ctxKeyTopic, topic) tags := models.NewTags() for _, tag := range in.LogGroup.GetLogTags() { tags.Add(tag.GetKey(), tag.GetValue()) } if len(topic) > 0 { tags.Add(tagKeyLogTopic, topic) } group := models.NewGroup(meta, tags) events := make([]models.PipelineEvent, 0, len(in.LogGroup.GetLogs())) for _, log := range in.LogGroup.GetLogs() { events = append(events, p.convertToPipelineEvent(log)) } p.InputPipeContext.Collector().Collect(group, events...) } // TODO: Design the ReceiveRawLogV2, which is passed in a PipelineGroupEvents not pipeline.LogWithContext, and tags should be added in the PipelineGroupEvents. func (p *pluginv2Runner) ReceiveRawLog(in *pipeline.LogWithContext) { md := models.NewMetadata() tags := models.NewTags() if in.Context != nil { for k, v := range in.Context { switch value := v.(type) { case string: md.Add(k, value) case []*protocol.LogTag: for _, tag := range value { tags.Add(tag.GetKey(), tag.GetValue()) } default: logger.Warningf(p.LogstoreConfig.Context.GetRuntimeContext(), "RECEIVE_RAW_LOG_ALARM", "unknown values found in context, type is %T", v) } } } log := p.convertToPipelineEvent(in.Log) group := models.NewGroup(md, tags) p.InputPipeContext.Collector().Collect(group, log) } func (p *pluginv2Runner) Merge(r PluginRunner) { if other, ok := r.(*pluginv2Runner); ok { p.FlushOutStore.Merge(other.FlushOutStore) } } func (p *pluginv2Runner) convertToPipelineEvent(in *protocol.Log) models.PipelineEvent { log := &models.Log{} log.Tags = models.NewTags() for i, content := range in.Contents { switch { case content.Key == contentKey || i == 0: log.SetBody(util.ZeroCopyStringToBytes(content.Value)) case content.Key == fileOffsetKey: if offset, err := strconv.ParseInt(content.Value, 10, 64); err == nil { log.Offset = uint64(offset) } case strings.Contains(content.Key, tagPrefix): log.Tags.Add(content.Key[len(tagPrefix):], content.Value) default: log.Tags.Add(content.Key, content.Value) } } if in.Time != 0 { log.Timestamp = uint64(time.Second * time.Duration(in.Time)) if in.TimeNs != nil { log.Timestamp += uint64(*in.TimeNs) } } else { log.Timestamp = uint64(time.Now().UnixNano()) } return log }