pluginmanager/plugin_manager.go (293 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pluginmanager import ( "context" "fmt" "runtime" "runtime/debug" "sync" "time" "github.com/alibaba/ilogtail/pkg/config" "github.com/alibaba/ilogtail/pkg/flags" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" ) // Following variables are exported so that tests of main package can reference them. var LogtailConfigLock sync.RWMutex var LogtailConfig map[string]*LogstoreConfig // Configs that are inited and will be started. // One config may have multiple Go pipelines, such as ContainerInfo (with input) and static file (without input). var ToStartPipelineConfigWithInput *LogstoreConfig var ToStartPipelineConfigWithoutInput *LogstoreConfig var ContainerConfig *LogstoreConfig // Configs that were disabled because of slow or hang config. var DisabledLogtailConfigLock sync.RWMutex var DisabledLogtailConfig = make(map[*LogstoreConfig]struct{}) var LastUnsendBuffer = make(map[string]PluginRunner) // Two built-in logtail configs to report statistics and alarm (from system and other logtail configs). var AlarmConfig *LogstoreConfig var alarmConfigJSON = `{ "global": { "InputIntervalMs" : 30000, "AggregatIntervalMs": 1000, "FlushIntervalMs": 1000, "DefaultLogQueueSize": 4, "DefaultLogGroupQueueSize": 4, "Tags" : { "base_version" : "` + config.BaseVersion + `", "` + config.LoongcollectorGlobalConfig.LoongCollectorVersionTag + `" : "` + config.BaseVersion + `" } }, "inputs" : [ { "type" : "metric_alarm", "detail" : null } ] }` var containerConfigJSON = `{ "global": { "InputIntervalMs" : 30000, "AggregatIntervalMs": 1000, "FlushIntervalMs": 1000, "DefaultLogQueueSize": 4, "DefaultLogGroupQueueSize": 4, "Tags" : { "base_version" : "` + config.BaseVersion + `", "` + config.LoongcollectorGlobalConfig.LoongCollectorVersionTag + `" : "` + config.BaseVersion + `" } }, "inputs" : [ { "type" : "metric_container", "detail" : null } ] }` func panicRecover(pluginType string) { if err := recover(); err != nil { trace := make([]byte, 2048) runtime.Stack(trace, true) logger.Error(context.Background(), "PLUGIN_RUNTIME_ALARM", "plugin", pluginType, "panicked", err, "stack", string(trace)) } } // Init initializes plugin manager. func Init() (err error) { logger.Info(context.Background(), "init plugin, local env tags", helper.EnvTags) if err = CheckPointManager.Init(); err != nil { return } if AlarmConfig, err = loadBuiltinConfig("alarm", "sls-admin", "logtail_alarm", "logtail_alarm", alarmConfigJSON); err != nil { logger.Error(context.Background(), "LOAD_PLUGIN_ALARM", "load alarm config fail", err) return } if ContainerConfig, err = loadBuiltinConfig("container", "sls-admin", "logtail_containers", "logtail_containers", containerConfigJSON); err != nil { logger.Error(context.Background(), "LOAD_PLUGIN_ALARM", "load container config fail", err) return } logger.Info(context.Background(), "loadBuiltinConfig container") return } // timeoutStop wrappers LogstoreConfig.Stop with timeout (5s by default). // @return true if Stop returns before timeout, otherwise false. func timeoutStop(config *LogstoreConfig, removedFlag bool) bool { done := make(chan int) go func() { addressStr := fmt.Sprintf("%p", config) logger.Info(config.Context.GetRuntimeContext(), "Stop config in goroutine", "begin", "LogstoreConfig", addressStr) _ = config.Stop(removedFlag) close(done) logger.Info(context.Background(), "Stop config in goroutine", "end", "LogstoreConfig", addressStr) // The config is valid but stop slowly, allow it to load again. DisabledLogtailConfigLock.Lock() if _, exists := DisabledLogtailConfig[config]; !exists { DisabledLogtailConfigLock.Unlock() return } logger.Info(context.Background(), "Valid but slow stop config", config.ConfigName, "LogstoreConfig", addressStr) DeleteLogstoreConfig(config, removedFlag) delete(DisabledLogtailConfig, config) DisabledLogtailConfigLock.Unlock() }() select { case <-done: return true case <-time.After(30 * time.Second): return false } } // StopAllPipelines stops all pipelines so that it is ready // to quit. // For user-defined config, timeoutStop is used to avoid hanging. func StopAllPipelines(withInput bool) error { defer panicRecover("Run plugin") LogtailConfigLock.Lock() toDeleteConfigNames := make(map[string]struct{}) for configName, logstoreConfig := range LogtailConfig { needStop := false if withInput { // if request is withinput=true, only stop logstoreConfig.PluginRunner.IsWithInputPlugin=true if logstoreConfig.PluginRunner.IsWithInputPlugin() { needStop = true } } else { // if request is withinput=false, only stop logstoreConfig.PluginRunner.IsWithInputPlugin=false if !logstoreConfig.PluginRunner.IsWithInputPlugin() { needStop = true } } if needStop { logger.Info(logstoreConfig.Context.GetRuntimeContext(), "Stop config", configName) if hasStopped := timeoutStop(logstoreConfig, true); !hasStopped { // TODO: This alarm can not be sent to server in current alarm design. logger.Error(logstoreConfig.Context.GetRuntimeContext(), "CONFIG_STOP_TIMEOUT_ALARM", "timeout when stop config, goroutine might leak") // TODO: The key should be versioned. Current implementation will overwrite the previous version when reload a block config multiple times. DisabledLogtailConfigLock.Lock() DisabledLogtailConfig[logstoreConfig] = struct{}{} DisabledLogtailConfigLock.Unlock() } else { DeleteLogstoreConfig(logstoreConfig, true) } toDeleteConfigNames[configName] = struct{}{} } } for key := range toDeleteConfigNames { delete(LogtailConfig, key) } LogtailConfigLock.Unlock() return nil } func DeleteLogstoreConfig(config *LogstoreConfig, removedFlag bool) { if actualObject, ok := config.Context.(*ContextImp); ok { actualObject.logstoreC = nil } config.Context = nil if runner, ok := config.PluginRunner.(*pluginv1Runner); ok { for _, obj := range runner.MetricPlugins { obj.Config = nil } for _, obj := range runner.ServicePlugins { obj.Config = nil } for _, obj := range runner.ProcessorPlugins { obj.Config = nil } for _, obj := range runner.AggregatorPlugins { obj.Config = nil } for _, obj := range runner.FlusherPlugins { obj.Config = nil } runner.LogstoreConfig = nil } else if runner, ok := config.PluginRunner.(*pluginv2Runner); ok { for _, obj := range runner.MetricPlugins { obj.Config = nil } for _, obj := range runner.ServicePlugins { obj.Config = nil } for _, obj := range runner.ProcessorPlugins { obj.Config = nil } for _, obj := range runner.AggregatorPlugins { obj.Config = nil } for _, obj := range runner.FlusherPlugins { obj.Config = nil } runner.LogstoreConfig = nil } if !removedFlag { LastUnsendBuffer[config.ConfigName] = config.PluginRunner } config.PluginRunner = nil } func DeleteLogstoreConfigFromLogtailConfig(configName string, removedFlag bool) { LogtailConfigLock.Lock() if config, ok := LogtailConfig[configName]; ok { DeleteLogstoreConfig(config, removedFlag) delete(LogtailConfig, configName) } LogtailConfigLock.Unlock() } // StopBuiltInModulesConfig stops built-in services (self monitor, alarm, container and checkpoint manager). func StopBuiltInModulesConfig() { if AlarmConfig != nil { if *flags.ForceSelfCollect { logger.Info(context.Background(), "force collect the alarm metrics") control := pipeline.NewAsyncControl() AlarmConfig.PluginRunner.RunPlugins(pluginMetricInput, control) control.WaitCancel() } _ = AlarmConfig.Stop(true) AlarmConfig = nil } if ContainerConfig != nil { if *flags.ForceSelfCollect { logger.Info(context.Background(), "force collect the container metrics") control := pipeline.NewAsyncControl() ContainerConfig.PluginRunner.RunPlugins(pluginMetricInput, control) control.WaitCancel() } _ = ContainerConfig.Stop(true) ContainerConfig = nil } CheckPointManager.Stop() } // Stop stop the given config. ConfigName is with suffix. func Stop(configName string, removedFlag bool) error { defer panicRecover("Run plugin") LogtailConfigLock.RLock() if config, exists := LogtailConfig[configName]; exists { LogtailConfigLock.RUnlock() if hasStopped := timeoutStop(config, removedFlag); !hasStopped { logger.Error(config.Context.GetRuntimeContext(), "CONFIG_STOP_TIMEOUT_ALARM", "timeout when stop config, goroutine might leak") DisabledLogtailConfigLock.Lock() DisabledLogtailConfig[config] = struct{}{} DisabledLogtailConfigLock.Unlock() LogtailConfigLock.Lock() delete(LogtailConfig, configName) LogtailConfigLock.Unlock() } else { logger.Info(config.Context.GetRuntimeContext(), "Stop config now", configName) LogtailConfigLock.Lock() DeleteLogstoreConfig(config, removedFlag) delete(LogtailConfig, configName) LogtailConfigLock.Unlock() } return nil } LogtailConfigLock.RUnlock() return fmt.Errorf("config not found: %s", configName) } // Start starts the given config. ConfigName is with suffix. func Start(configName string) error { defer panicRecover("Run plugin") if ToStartPipelineConfigWithInput != nil && ToStartPipelineConfigWithInput.ConfigNameWithSuffix == configName { ToStartPipelineConfigWithInput.Start() LogtailConfigLock.Lock() LogtailConfig[ToStartPipelineConfigWithInput.ConfigNameWithSuffix] = ToStartPipelineConfigWithInput LogtailConfigLock.Unlock() ToStartPipelineConfigWithInput = nil return nil } else if ToStartPipelineConfigWithoutInput != nil && ToStartPipelineConfigWithoutInput.ConfigNameWithSuffix == configName { ToStartPipelineConfigWithoutInput.Start() LogtailConfigLock.Lock() LogtailConfig[ToStartPipelineConfigWithoutInput.ConfigNameWithSuffix] = ToStartPipelineConfigWithoutInput LogtailConfigLock.Unlock() ToStartPipelineConfigWithoutInput = nil return nil } // should never happen var loadedConfigName string if ToStartPipelineConfigWithInput != nil { loadedConfigName = ToStartPipelineConfigWithInput.ConfigNameWithSuffix } if ToStartPipelineConfigWithoutInput != nil { loadedConfigName += " " + ToStartPipelineConfigWithoutInput.ConfigNameWithSuffix } return fmt.Errorf("config unmatch with the loaded pipeline: given %s, expect %s", configName, loadedConfigName) } func init() { go func() { for { // force gc every 3 minutes time.Sleep(time.Minute * 3) logger.Debug(context.Background(), "force gc done", time.Now()) runtime.GC() logger.Debug(context.Background(), "force gc done", time.Now()) debug.FreeOSMemory() logger.Debug(context.Background(), "free os memory done", time.Now()) if logger.DebugFlag() { gcStat := debug.GCStats{} debug.ReadGCStats(&gcStat) logger.Debug(context.Background(), "gc stats", gcStat) memStat := runtime.MemStats{} runtime.ReadMemStats(&memStat) logger.Debug(context.Background(), "mem stats", memStat) } } }() }