pluginmanager/logstore_config.go (731 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pluginmanager import ( "bytes" "context" "crypto/md5" //nolint:gosec "encoding/json" "fmt" "strconv" "strings" "sync/atomic" "github.com/alibaba/ilogtail/pkg/config" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/plugins/input" ) var maxFlushOutTime = 5 const mixProcessModeFlag = "mix_process_mode" type mixProcessMode int const ( normal mixProcessMode = iota file observer ) // checkMixProcessMode // When inputs plugins not exist, it means this LogConfig is a mixed process mode config. // And the default mix process mode is the file mode. func checkMixProcessMode(pluginCfg map[string]interface{}) mixProcessMode { config, exists := pluginCfg["inputs"] inputs, ok := config.([]interface{}) if exists && ok && len(inputs) > 0 { return normal } mixModeFlag, mixModeFlagOk := pluginCfg[mixProcessModeFlag] if !mixModeFlagOk { return file } s := mixModeFlag.(string) switch { case strings.EqualFold(s, "observer"): return observer default: return file } } type ConfigVersion string var ( v1 ConfigVersion = "v1" v2 ConfigVersion = "v2" ) type LogstoreConfig struct { // common fields ProjectName string LogstoreName string ConfigName string ConfigNameWithSuffix string LogstoreKey int64 FlushOutFlag atomic.Bool // Each LogstoreConfig can have its independent GlobalConfig if the "global" field // is offered in configuration, see build-in AlarmConfig. GlobalConfig *config.GlobalConfig Version ConfigVersion Context pipeline.Context PluginRunner PluginRunner // private fields configDetailHash string K8sLabelSet map[string]struct{} ContainerLabelSet map[string]struct{} EnvSet map[string]struct{} CollectingContainersMeta bool pluginID int32 } // Start initializes plugin instances in config and starts them. // Procedures: // 1. Start flusher goroutine and push FlushOutLogGroups inherited from last config // instance to LogGroupsChan, so that they can be flushed to flushers. // 2. Start aggregators, allocate new goroutine for each one. // 3. Start processor goroutine to process logs from LogsChan. // 4. Start inputs (including metrics and services), just like aggregator, each input // has its own goroutine. func (lc *LogstoreConfig) Start() { lc.FlushOutFlag.Store(false) logger.Info(lc.Context.GetRuntimeContext(), "config start", "begin") lc.PluginRunner.Run() logger.Info(lc.Context.GetRuntimeContext(), "config start", "success") } // Stop stops plugin instances and corresponding goroutines of config. // @removedFlag passed from C++, indicates that if config will be removed after this. // Procedures: // 1. SetUrgent to all flushers to indicate them current state. // 2. Stop all input plugins, stop generating logs. // 3. Stop processor goroutine, pass all existing logs to aggregator. // 4. Stop all aggregator plugins, make all logs to LogGroups. // 5. Set stopping flag, stop flusher goroutine. // 6. If config will be removed and there are remaining data, try to flush once. // 7. Stop flusher plugins. func (lc *LogstoreConfig) Stop(removedFlag bool) error { logger.Info(lc.Context.GetRuntimeContext(), "config stop", "begin", "removing", removedFlag) if err := lc.PluginRunner.Stop(removedFlag); err != nil { return err } logger.Info(lc.Context.GetRuntimeContext(), "Plugin Runner stop", "done") logger.Info(lc.Context.GetRuntimeContext(), "config stop", "success") return nil } const ( rawStringKey = "content" defaultTagPrefix = "__tag__:__prefix__" ) var ( tagDelimiter = []byte("^^^") tagSeparator = []byte("~=~") ) // extractTags extracts tags from rawTags and append them into log. // Rule: k1~=~v1^^^k2~=~v2 // rawTags func extractTags(rawTags []byte, log *protocol.Log) { defaultPrefixIndex := 0 for len(rawTags) != 0 { idx := bytes.Index(rawTags, tagDelimiter) var part []byte if idx < 0 { part = rawTags rawTags = rawTags[len(rawTags):] } else { part = rawTags[:idx] rawTags = rawTags[idx+len(tagDelimiter):] } if len(part) > 0 { pos := bytes.Index(part, tagSeparator) if pos > 0 { log.Contents = append(log.Contents, &protocol.Log_Content{ Key: string(part[:pos]), Value: string(part[pos+len(tagSeparator):]), }) } else { log.Contents = append(log.Contents, &protocol.Log_Content{ Key: defaultTagPrefix + strconv.Itoa(defaultPrefixIndex), Value: string(part), }) } defaultPrefixIndex++ } } } // extractTagsToLogTags extracts tags from rawTags and append them into []*protocol.LogTag. // Rule: k1~=~v1^^^k2~=~v2 // rawTags func extractTagsToLogTags(rawTags []byte) []*protocol.LogTag { logTags := []*protocol.LogTag{} defaultPrefixIndex := 0 for len(rawTags) != 0 { idx := bytes.Index(rawTags, tagDelimiter) var part []byte if idx < 0 { part = rawTags rawTags = rawTags[len(rawTags):] } else { part = rawTags[:idx] rawTags = rawTags[idx+len(tagDelimiter):] } if len(part) > 0 { pos := bytes.Index(part, tagSeparator) if pos > 0 { logTags = append(logTags, &protocol.LogTag{ Key: string(part[:pos]), Value: string(part[pos+len(tagSeparator):]), }) } else { logTags = append(logTags, &protocol.LogTag{ Key: defaultTagPrefix + strconv.Itoa(defaultPrefixIndex), Value: string(part), }) } defaultPrefixIndex++ } } return logTags } // ProcessRawLogV2 ... // V1 -> V2: enable topic field, and use tags field to pass more tags. // unsafe parameter: rawLog,packID and tags // safe parameter: topic func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int { log := &protocol.Log{ Contents: make([]*protocol.Log_Content, 0, 16), } log.Contents = append(log.Contents, &protocol.Log_Content{Key: rawStringKey, Value: string(rawLog)}) if len(topic) > 0 { log.Contents = append(log.Contents, &protocol.Log_Content{Key: "__log_topic__", Value: topic}) } // When UsingOldContentTag is set to false, the tag is now put into the context during cgo. if !lc.GlobalConfig.UsingOldContentTag { logTags := extractTagsToLogTags(tags) lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic, "tags": logTags}}) } else { extractTags(tags, log) lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}}) } return 0 } func (lc *LogstoreConfig) ProcessLog(logByte []byte, packID string, topic string, tags []byte) int { log := &protocol.Log{} err := log.Unmarshal(logByte) if err != nil { logger.Error(lc.Context.GetRuntimeContext(), "WRONG_PROTOBUF_ALARM", "cannot process logs passed by core, err", err) return -1 } if len(topic) > 0 { log.Contents = append(log.Contents, &protocol.Log_Content{Key: "__log_topic__", Value: topic}) } // When UsingOldContentTag is set to false, the tag is now put into the context during cgo. if !lc.GlobalConfig.UsingOldContentTag { logTags := extractTagsToLogTags(tags) lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic, "tags": logTags}}) } else { extractTags(tags, log) lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}}) } return 0 } func (lc *LogstoreConfig) ProcessLogGroup(logByte []byte, packID string) int { logGroup := &protocol.LogGroup{} err := logGroup.Unmarshal(logByte) if err != nil { logger.Error(lc.Context.GetRuntimeContext(), "WRONG_PROTOBUF_ALARM", "cannot process log group passed by core, err", err) return -1 } lc.PluginRunner.ReceiveLogGroup(pipeline.LogGroupWithContext{ LogGroup: logGroup, Context: map[string]interface{}{ctxKeySource: packID}}, ) return 0 } func hasDockerStdoutInput(plugins map[string]interface{}) bool { inputs, exists := plugins["inputs"] if !exists { return false } inputList, valid := inputs.([]interface{}) if !valid { return false } for _, detail := range inputList { cfg, valid := detail.(map[string]interface{}) if !valid { continue } pluginTypeWithID, valid := cfg["type"] if !valid { continue } if val, valid := pluginTypeWithID.(string); valid { pluginType := getPluginType(val) if pluginType == input.ServiceDockerStdoutPluginName { return true } } } return false } func createLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, jsonStr string) (*LogstoreConfig, error) { var err error contextImp := &ContextImp{} contextImp.InitContext(project, logstore, configName) logstoreC := &LogstoreConfig{ ProjectName: project, LogstoreName: logstore, ConfigName: config.GetRealConfigName(configName), ConfigNameWithSuffix: configName, LogstoreKey: logstoreKey, Context: contextImp, configDetailHash: fmt.Sprintf("%x", md5.Sum([]byte(jsonStr))), //nolint:gosec } contextImp.logstoreC = logstoreC var plugins = make(map[string]interface{}) if err = json.Unmarshal([]byte(jsonStr), &plugins); err != nil { return nil, err } logstoreC.Version = fetchPluginVersion(plugins) if logstoreC.PluginRunner, err = initPluginRunner(logstoreC); err != nil { return nil, err } if lastConfigRunner, hasLastConfig := LastUnsendBuffer[configName]; hasLastConfig { // Move unsent LogGroups from last config to new config. logstoreC.PluginRunner.Merge(lastConfigRunner) } logstoreC.ContainerLabelSet = make(map[string]struct{}) logstoreC.EnvSet = make(map[string]struct{}) logstoreC.K8sLabelSet = make(map[string]struct{}) // add env and label set to logstore config inputs, exists := plugins["inputs"] if exists { inputList, valid := inputs.([]interface{}) if valid { for _, detail := range inputList { cfg, valid := detail.(map[string]interface{}) if !valid { continue } pluginTypeWithID, valid := cfg["type"] if !valid { continue } val, valid := pluginTypeWithID.(string) if !valid { continue } pluginType := getPluginType(val) if pluginType == input.ServiceDockerStdoutPluginName || pluginType == input.MetricDocierFilePluginName { configDetail, valid := cfg["detail"] if !valid { continue } detailMap, valid := configDetail.(map[string]interface{}) if !valid { continue } for key, value := range detailMap { lowerKey := strings.ToLower(key) if strings.Contains(lowerKey, "include") || strings.Contains(lowerKey, "exclude") { conditionMap, valid := value.(map[string]interface{}) if !valid { continue } if strings.Contains(lowerKey, "k8slabel") { for key := range conditionMap { logstoreC.K8sLabelSet[key] = struct{}{} } } else if strings.Contains(lowerKey, "label") { for key := range conditionMap { logstoreC.ContainerLabelSet[key] = struct{}{} } } if strings.Contains(lowerKey, "env") { for key := range conditionMap { logstoreC.EnvSet[key] = struct{}{} } } } if strings.Contains(lowerKey, "collectcontainersflag") { collectContainersFlag, valid := value.(bool) if !valid { continue } logstoreC.CollectingContainersMeta = collectContainersFlag } else if strings.Contains(lowerKey, "collectingcontainersmeta") { collectingContainersMeta, valid := value.(bool) if !valid { continue } logstoreC.CollectingContainersMeta = collectingContainersMeta } } } } } } logstoreC.GlobalConfig = &config.LoongcollectorGlobalConfig // If plugins config has "global" field, then override the logstoreC.GlobalConfig if pluginConfigInterface, flag := plugins["global"]; flag { pluginConfig := &config.GlobalConfig{} *pluginConfig = config.LoongcollectorGlobalConfig if flag { configJSONStr, err := json.Marshal(pluginConfigInterface) //nolint:govet if err != nil { return nil, err } err = json.Unmarshal(configJSONStr, &pluginConfig) if err != nil { return nil, err } pluginConfig.AppendingAllEnvMetaTag = false if pluginConfigMap, ok := pluginConfigInterface.(map[string]interface{}); ok { if _, ok := pluginConfigMap["AgentEnvMetaTagKey"]; !ok { pluginConfig.AppendingAllEnvMetaTag = true } } } logstoreC.GlobalConfig = pluginConfig if logstoreC.GlobalConfig.PipelineMetaTagKey == nil { logstoreC.GlobalConfig.PipelineMetaTagKey = make(map[string]string) } if logstoreC.GlobalConfig.AgentEnvMetaTagKey == nil { logstoreC.GlobalConfig.AgentEnvMetaTagKey = make(map[string]string) } logger.Debug(contextImp.GetRuntimeContext(), "load plugin config", *logstoreC.GlobalConfig) } logQueueSize := logstoreC.GlobalConfig.DefaultLogQueueSize // Because the transferred data of the file MixProcessMode is quite large, we have to limit queue size to control memory usage here. if checkMixProcessMode(plugins) == file { logger.Infof(contextImp.GetRuntimeContext(), "no inputs in config %v, maybe file input, limit queue size", configName) logQueueSize = 10 } logGroupSize := logstoreC.GlobalConfig.DefaultLogGroupQueueSize if err = logstoreC.PluginRunner.Init(logQueueSize, logGroupSize); err != nil { return nil, err } // extensions should be initialized first pluginConfig, ok := plugins["extensions"] if ok { extensions, extensionsFound := pluginConfig.([]interface{}) if !extensionsFound { return nil, fmt.Errorf("invalid extension type: %s, not json array", "extensions") } for _, extensionInterface := range extensions { extension, ok := extensionInterface.(map[string]interface{}) if !ok { return nil, fmt.Errorf("invalid extension type") } if pluginTypeWithID, ok := extension["type"]; ok { pluginTypeWithIDStr, ok := pluginTypeWithID.(string) if !ok { return nil, fmt.Errorf("invalid extension type") } pluginType := getPluginType(pluginTypeWithIDStr) logger.Debug(contextImp.GetRuntimeContext(), "add extension", pluginType) err = loadExtension(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, extension["detail"]) if err != nil { return nil, err } contextImp.AddPlugin(pluginType) } } } pluginConfig, inputsFound := plugins["inputs"] if inputsFound { inputs, ok := pluginConfig.([]interface{}) if ok { for _, inputInterface := range inputs { input, ok := inputInterface.(map[string]interface{}) if ok { if pluginTypeWithID, ok := input["type"]; ok { if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok { pluginType := getPluginType(pluginTypeWithIDStr) if _, isMetricInput := pipeline.MetricInputs[pluginType]; isMetricInput { // Load MetricInput plugin defined in pipeline.MetricInputs // pipeline.MetricInputs will be renamed in a future version err = loadMetric(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, input["detail"]) } else if _, isServiceInput := pipeline.ServiceInputs[pluginType]; isServiceInput { // Load ServiceInput plugin defined in pipeline.ServiceInputs err = loadService(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, input["detail"]) } if err != nil { return nil, err } contextImp.AddPlugin(pluginType) continue } } } return nil, fmt.Errorf("invalid input type") } } else { return nil, fmt.Errorf("invalid inputs type : %s, not json array", "inputs") } } pluginConfig, processorsFound := plugins["processors"] if processorsFound { processors, ok := pluginConfig.([]interface{}) if ok { for i, processorInterface := range processors { processor, ok := processorInterface.(map[string]interface{}) if ok { if pluginTypeWithID, ok := processor["type"]; ok { if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok { pluginType := getPluginType(pluginTypeWithIDStr) logger.Debug(contextImp.GetRuntimeContext(), "add processor", pluginType) err = loadProcessor(logstoreC.genPluginMeta(pluginTypeWithIDStr), i, logstoreC, processor["detail"]) if err != nil { return nil, err } contextImp.AddPlugin(pluginType) continue } } } return nil, fmt.Errorf("invalid processor type") } } else { return nil, fmt.Errorf("invalid processors type : %s, not json array", "processors") } } pluginConfig, aggregatorsFound := plugins["aggregators"] if aggregatorsFound { aggregators, ok := pluginConfig.([]interface{}) if ok { for _, aggregatorInterface := range aggregators { aggregator, ok := aggregatorInterface.(map[string]interface{}) if ok { if pluginTypeWithID, ok := aggregator["type"]; ok { if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok { pluginType := getPluginType(pluginTypeWithIDStr) logger.Debug(contextImp.GetRuntimeContext(), "add aggregator", pluginType) err = loadAggregator(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, aggregator["detail"]) if err != nil { return nil, err } contextImp.AddPlugin(pluginType) continue } } } return nil, fmt.Errorf("invalid aggregator type") } } else { return nil, fmt.Errorf("invalid aggregator type : %s, not json array", "aggregators") } } if err = logstoreC.PluginRunner.AddDefaultAggregatorIfEmpty(); err != nil { return nil, err } pluginConfig, flushersFound := plugins["flushers"] if flushersFound { flushers, ok := pluginConfig.([]interface{}) if ok { for _, flusherInterface := range flushers { flusher, ok := flusherInterface.(map[string]interface{}) if ok { if pluginTypeWithID, ok := flusher["type"]; ok { if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok { pluginType := getPluginType(pluginTypeWithIDStr) logger.Debug(contextImp.GetRuntimeContext(), "add flusher", pluginType) err = loadFlusher(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, flusher["detail"]) if err != nil { return nil, err } contextImp.AddPlugin(pluginType) continue } } } return nil, fmt.Errorf("invalid flusher type") } } else { return nil, fmt.Errorf("invalid flusher type : %s, not json array", "flushers") } } if err = logstoreC.PluginRunner.AddDefaultFlusherIfEmpty(); err != nil { return nil, err } return logstoreC, nil } func fetchPluginVersion(config map[string]interface{}) ConfigVersion { if v, ok := config["global"]; ok { if global, ok := v.(map[string]interface{}); ok { if version, ok := global["StructureType"]; ok { if str, ok := version.(string); ok { return ConfigVersion(strings.ToLower(str)) } } } } return v1 } func initPluginRunner(lc *LogstoreConfig) (PluginRunner, error) { switch lc.Version { case v1: return &pluginv1Runner{ LogstoreConfig: lc, FlushOutStore: NewFlushOutStore[protocol.LogGroup](), }, nil case v2: return &pluginv2Runner{ LogstoreConfig: lc, FlushOutStore: NewFlushOutStore[models.PipelineGroupEvents](), }, nil default: return nil, fmt.Errorf("undefined config version %s", lc.Version) } } func LoadLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, jsonStr string) error { if len(jsonStr) == 0 { logger.Info(context.Background(), "delete config", configName, "logstore", logstore) DeleteLogstoreConfigFromLogtailConfig(configName, true) return nil } logger.Info(context.Background(), "load config", configName, "logstore", logstore) logstoreC, err := createLogstoreConfig(project, logstore, configName, logstoreKey, jsonStr) if err != nil { return err } if logstoreC.PluginRunner.IsWithInputPlugin() { ToStartPipelineConfigWithInput = logstoreC } else { ToStartPipelineConfigWithoutInput = logstoreC } return nil } func UnloadPartiallyLoadedConfig(configName string) error { logger.Info(context.Background(), "unload config", configName) if ToStartPipelineConfigWithInput.ConfigNameWithSuffix == configName { ToStartPipelineConfigWithInput = nil return nil } if ToStartPipelineConfigWithoutInput.ConfigNameWithSuffix == configName { ToStartPipelineConfigWithoutInput = nil return nil } logger.Error(context.Background(), "unload config", "config not found", configName) return fmt.Errorf("config not found") } func loadBuiltinConfig(name string, project string, logstore string, configName string, cfgStr string) (*LogstoreConfig, error) { logger.Infof(context.Background(), "load built-in config %v, config name: %v, logstore: %v", name, configName, logstore) return createLogstoreConfig(project, logstore, configName, -1, cfgStr) } // loadMetric creates a metric plugin object and append to logstoreConfig.MetricPlugins. // @pluginType: the type of metric plugin. // @logstoreConfig: where to store the created metric plugin object. // It returns any error encountered. func loadMetric(pluginMeta *pipeline.PluginMeta, logstoreConfig *LogstoreConfig, configInterface interface{}) (err error) { creator, existFlag := pipeline.MetricInputs[pluginMeta.PluginType] if !existFlag || creator == nil { return fmt.Errorf("can't find plugin %s", pluginMeta.PluginType) } metric := creator() if err = applyPluginConfig(metric, configInterface); err != nil { return err } interval := logstoreConfig.GlobalConfig.InputIntervalMs configMapI, convertSuc := configInterface.(map[string]interface{}) if convertSuc { valI, keyExist := configMapI["IntervalMs"] if keyExist { if val, convSuc := valI.(float64); convSuc { interval = int(val) } } } return logstoreConfig.PluginRunner.AddPlugin(pluginMeta, pluginMetricInput, metric, map[string]interface{}{"interval": interval}) } // loadService creates a service plugin object and append to logstoreConfig.ServicePlugins. // @pluginType: the type of service plugin. // @logstoreConfig: where to store the created service plugin object. // It returns any error encountered. func loadService(pluginMeta *pipeline.PluginMeta, logstoreConfig *LogstoreConfig, configInterface interface{}) (err error) { creator, existFlag := pipeline.ServiceInputs[pluginMeta.PluginType] if !existFlag || creator == nil { return fmt.Errorf("can't find plugin %s", pluginMeta.PluginType) } service := creator() if err = applyPluginConfig(service, configInterface); err != nil { return err } return logstoreConfig.PluginRunner.AddPlugin(pluginMeta, pluginServiceInput, service, map[string]interface{}{}) } func loadProcessor(pluginMeta *pipeline.PluginMeta, priority int, logstoreConfig *LogstoreConfig, configInterface interface{}) (err error) { creator, existFlag := pipeline.Processors[pluginMeta.PluginType] if !existFlag || creator == nil { logger.Error(logstoreConfig.Context.GetRuntimeContext(), "INVALID_PROCESSOR_TYPE", "invalid processor type, maybe type is wrong or logtail version is too old", pluginMeta.PluginType) return nil } processor := creator() if err = applyPluginConfig(processor, configInterface); err != nil { return err } return logstoreConfig.PluginRunner.AddPlugin(pluginMeta, pluginProcessor, processor, map[string]interface{}{"priority": priority}) } func loadAggregator(pluginMeta *pipeline.PluginMeta, logstoreConfig *LogstoreConfig, configInterface interface{}) (err error) { creator, existFlag := pipeline.Aggregators[pluginMeta.PluginType] if !existFlag || creator == nil { logger.Error(logstoreConfig.Context.GetRuntimeContext(), "INVALID_AGGREGATOR_TYPE", "invalid aggregator type, maybe type is wrong or logtail version is too old", pluginMeta.PluginType) return nil } aggregator := creator() if err = applyPluginConfig(aggregator, configInterface); err != nil { return err } return logstoreConfig.PluginRunner.AddPlugin(pluginMeta, pluginAggregator, aggregator, map[string]interface{}{}) } func loadFlusher(pluginMeta *pipeline.PluginMeta, logstoreConfig *LogstoreConfig, configInterface interface{}) (err error) { creator, existFlag := pipeline.Flushers[pluginMeta.PluginType] if !existFlag || creator == nil { return fmt.Errorf("can't find plugin %s", pluginMeta.PluginType) } flusher := creator() if err = applyPluginConfig(flusher, configInterface); err != nil { return err } return logstoreConfig.PluginRunner.AddPlugin(pluginMeta, pluginFlusher, flusher, map[string]interface{}{}) } func loadExtension(pluginMeta *pipeline.PluginMeta, logstoreConfig *LogstoreConfig, configInterface interface{}) (err error) { creator, existFlag := pipeline.Extensions[pluginMeta.PluginType] if !existFlag || creator == nil { return fmt.Errorf("can't find plugin %s", pluginMeta.PluginType) } extension := creator() if err = applyPluginConfig(extension, configInterface); err != nil { return err } if err = extension.Init(logstoreConfig.Context); err != nil { return err } return logstoreConfig.PluginRunner.AddPlugin(pluginMeta, pluginExtension, extension, map[string]interface{}{}) } func applyPluginConfig(plugin interface{}, pluginConfig interface{}) error { config, err := json.Marshal(pluginConfig) if err != nil { return err } err = json.Unmarshal(config, plugin) return err } // Rule: pluginTypeWithID=pluginType/pluginID#pluginPriority. func getPluginType(pluginTypeWithID string) string { if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 { return pluginTypeWithID[:ids] } return pluginTypeWithID } func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string) *pipeline.PluginMeta { if isPluginTypeWithID(pluginTypeWithID) { pluginTypeWithID := pluginTypeWithID if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 { pluginTypeWithID = pluginTypeWithID[:idx] } if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 { if pluginID, err := strconv.ParseInt(pluginTypeWithID[ids+1:], 10, 32); err == nil { atomic.StoreInt32(&lc.pluginID, int32(pluginID)) } return &pipeline.PluginMeta{ PluginTypeWithID: getPluginTypeWithID(pluginTypeWithID), PluginType: getPluginType(pluginTypeWithID), PluginID: getPluginID(pluginTypeWithID), } } } pluginType := pluginTypeWithID pluginID := lc.genPluginID() pluginTypeWithID = fmt.Sprintf("%s/%s", pluginType, pluginID) return &pipeline.PluginMeta{ PluginTypeWithID: getPluginTypeWithID(pluginTypeWithID), PluginType: getPluginType(pluginTypeWithID), PluginID: getPluginID(pluginTypeWithID), } } func isPluginTypeWithID(pluginTypeWithID string) bool { if idx := strings.IndexByte(pluginTypeWithID, '/'); idx != -1 { return true } return false } func getPluginID(pluginTypeWithID string) string { slashCount := strings.Count(pluginTypeWithID, "/") switch slashCount { case 0: return "" case 1: if idx := strings.IndexByte(pluginTypeWithID, '/'); idx != -1 { return pluginTypeWithID[idx+1:] } default: if firstIdx := strings.IndexByte(pluginTypeWithID, '/'); firstIdx != -1 { if lastIdx := strings.LastIndexByte(pluginTypeWithID, '/'); lastIdx != -1 { return pluginTypeWithID[firstIdx+1 : lastIdx] } } } return "" } func getPluginTypeWithID(pluginTypeWithID string) string { return fmt.Sprintf("%s/%s", getPluginType(pluginTypeWithID), getPluginID(pluginTypeWithID)) } func GetPluginPriority(pluginTypeWithID string) int { if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 { val, err := strconv.Atoi(pluginTypeWithID[idx+1:]) if err != nil { return 0 } return val } return 0 } func (lc *LogstoreConfig) genPluginID() string { return fmt.Sprintf("%v", atomic.AddInt32(&lc.pluginID, 1)) } func init() { LogtailConfigLock.Lock() LogtailConfig = make(map[string]*LogstoreConfig) LogtailConfigLock.Unlock() }