func createLogstoreConfig()

in pluginmanager/logstore_config.go [306:598]


func createLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, jsonStr string) (*LogstoreConfig, error) {
	var err error
	contextImp := &ContextImp{}
	contextImp.InitContext(project, logstore, configName)
	logstoreC := &LogstoreConfig{
		ProjectName:          project,
		LogstoreName:         logstore,
		ConfigName:           config.GetRealConfigName(configName),
		ConfigNameWithSuffix: configName,
		LogstoreKey:          logstoreKey,
		Context:              contextImp,
		configDetailHash:     fmt.Sprintf("%x", md5.Sum([]byte(jsonStr))), //nolint:gosec
	}
	contextImp.logstoreC = logstoreC

	var plugins = make(map[string]interface{})
	if err = json.Unmarshal([]byte(jsonStr), &plugins); err != nil {
		return nil, err
	}

	logstoreC.Version = fetchPluginVersion(plugins)
	if logstoreC.PluginRunner, err = initPluginRunner(logstoreC); err != nil {
		return nil, err
	}
	if lastConfigRunner, hasLastConfig := LastUnsendBuffer[configName]; hasLastConfig {
		// Move unsent LogGroups from last config to new config.
		logstoreC.PluginRunner.Merge(lastConfigRunner)
	}

	logstoreC.ContainerLabelSet = make(map[string]struct{})
	logstoreC.EnvSet = make(map[string]struct{})
	logstoreC.K8sLabelSet = make(map[string]struct{})
	// add env and label set to logstore config
	inputs, exists := plugins["inputs"]
	if exists {
		inputList, valid := inputs.([]interface{})
		if valid {
			for _, detail := range inputList {
				cfg, valid := detail.(map[string]interface{})
				if !valid {
					continue
				}
				pluginTypeWithID, valid := cfg["type"]
				if !valid {
					continue
				}
				val, valid := pluginTypeWithID.(string)
				if !valid {
					continue
				}
				pluginType := getPluginType(val)
				if pluginType == input.ServiceDockerStdoutPluginName || pluginType == input.MetricDocierFilePluginName {
					configDetail, valid := cfg["detail"]
					if !valid {
						continue
					}
					detailMap, valid := configDetail.(map[string]interface{})
					if !valid {
						continue
					}
					for key, value := range detailMap {
						lowerKey := strings.ToLower(key)
						if strings.Contains(lowerKey, "include") || strings.Contains(lowerKey, "exclude") {
							conditionMap, valid := value.(map[string]interface{})
							if !valid {
								continue
							}
							if strings.Contains(lowerKey, "k8slabel") {
								for key := range conditionMap {
									logstoreC.K8sLabelSet[key] = struct{}{}
								}
							} else if strings.Contains(lowerKey, "label") {
								for key := range conditionMap {
									logstoreC.ContainerLabelSet[key] = struct{}{}
								}
							}
							if strings.Contains(lowerKey, "env") {
								for key := range conditionMap {
									logstoreC.EnvSet[key] = struct{}{}
								}
							}
						}
						if strings.Contains(lowerKey, "collectcontainersflag") {
							collectContainersFlag, valid := value.(bool)
							if !valid {
								continue
							}
							logstoreC.CollectingContainersMeta = collectContainersFlag
						} else if strings.Contains(lowerKey, "collectingcontainersmeta") {
							collectingContainersMeta, valid := value.(bool)
							if !valid {
								continue
							}
							logstoreC.CollectingContainersMeta = collectingContainersMeta
						}
					}
				}
			}
		}
	}

	logstoreC.GlobalConfig = &config.LoongcollectorGlobalConfig
	// If plugins config has "global" field, then override the logstoreC.GlobalConfig
	if pluginConfigInterface, flag := plugins["global"]; flag {
		pluginConfig := &config.GlobalConfig{}
		*pluginConfig = config.LoongcollectorGlobalConfig
		if flag {
			configJSONStr, err := json.Marshal(pluginConfigInterface) //nolint:govet
			if err != nil {
				return nil, err
			}
			err = json.Unmarshal(configJSONStr, &pluginConfig)
			if err != nil {
				return nil, err
			}
			pluginConfig.AppendingAllEnvMetaTag = false
			if pluginConfigMap, ok := pluginConfigInterface.(map[string]interface{}); ok {
				if _, ok := pluginConfigMap["AgentEnvMetaTagKey"]; !ok {
					pluginConfig.AppendingAllEnvMetaTag = true
				}
			}
		}
		logstoreC.GlobalConfig = pluginConfig
		if logstoreC.GlobalConfig.PipelineMetaTagKey == nil {
			logstoreC.GlobalConfig.PipelineMetaTagKey = make(map[string]string)
		}
		if logstoreC.GlobalConfig.AgentEnvMetaTagKey == nil {
			logstoreC.GlobalConfig.AgentEnvMetaTagKey = make(map[string]string)
		}
		logger.Debug(contextImp.GetRuntimeContext(), "load plugin config", *logstoreC.GlobalConfig)
	}

	logQueueSize := logstoreC.GlobalConfig.DefaultLogQueueSize
	// Because the transferred data of the file MixProcessMode is quite large, we have to limit queue size to control memory usage here.
	if checkMixProcessMode(plugins) == file {
		logger.Infof(contextImp.GetRuntimeContext(), "no inputs in config %v, maybe file input, limit queue size", configName)
		logQueueSize = 10
	}

	logGroupSize := logstoreC.GlobalConfig.DefaultLogGroupQueueSize

	if err = logstoreC.PluginRunner.Init(logQueueSize, logGroupSize); err != nil {
		return nil, err
	}

	// extensions should be initialized first
	pluginConfig, ok := plugins["extensions"]
	if ok {
		extensions, extensionsFound := pluginConfig.([]interface{})
		if !extensionsFound {
			return nil, fmt.Errorf("invalid extension type: %s, not json array", "extensions")
		}
		for _, extensionInterface := range extensions {
			extension, ok := extensionInterface.(map[string]interface{})
			if !ok {
				return nil, fmt.Errorf("invalid extension type")
			}
			if pluginTypeWithID, ok := extension["type"]; ok {
				pluginTypeWithIDStr, ok := pluginTypeWithID.(string)
				if !ok {
					return nil, fmt.Errorf("invalid extension type")
				}
				pluginType := getPluginType(pluginTypeWithIDStr)
				logger.Debug(contextImp.GetRuntimeContext(), "add extension", pluginType)
				err = loadExtension(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, extension["detail"])
				if err != nil {
					return nil, err
				}
				contextImp.AddPlugin(pluginType)
			}
		}
	}

	pluginConfig, inputsFound := plugins["inputs"]
	if inputsFound {
		inputs, ok := pluginConfig.([]interface{})
		if ok {
			for _, inputInterface := range inputs {
				input, ok := inputInterface.(map[string]interface{})
				if ok {
					if pluginTypeWithID, ok := input["type"]; ok {
						if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
							pluginType := getPluginType(pluginTypeWithIDStr)
							if _, isMetricInput := pipeline.MetricInputs[pluginType]; isMetricInput {
								// Load MetricInput plugin defined in pipeline.MetricInputs
								// pipeline.MetricInputs will be renamed in a future version
								err = loadMetric(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, input["detail"])
							} else if _, isServiceInput := pipeline.ServiceInputs[pluginType]; isServiceInput {
								// Load ServiceInput plugin defined in pipeline.ServiceInputs
								err = loadService(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, input["detail"])
							}
							if err != nil {
								return nil, err
							}
							contextImp.AddPlugin(pluginType)
							continue
						}
					}
				}
				return nil, fmt.Errorf("invalid input type")
			}
		} else {
			return nil, fmt.Errorf("invalid inputs type : %s, not json array", "inputs")
		}
	}
	pluginConfig, processorsFound := plugins["processors"]
	if processorsFound {
		processors, ok := pluginConfig.([]interface{})
		if ok {
			for i, processorInterface := range processors {
				processor, ok := processorInterface.(map[string]interface{})
				if ok {
					if pluginTypeWithID, ok := processor["type"]; ok {
						if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
							pluginType := getPluginType(pluginTypeWithIDStr)
							logger.Debug(contextImp.GetRuntimeContext(), "add processor", pluginType)
							err = loadProcessor(logstoreC.genPluginMeta(pluginTypeWithIDStr), i, logstoreC, processor["detail"])
							if err != nil {
								return nil, err
							}
							contextImp.AddPlugin(pluginType)
							continue
						}
					}
				}
				return nil, fmt.Errorf("invalid processor type")
			}
		} else {
			return nil, fmt.Errorf("invalid processors type : %s, not json array", "processors")
		}
	}

	pluginConfig, aggregatorsFound := plugins["aggregators"]
	if aggregatorsFound {
		aggregators, ok := pluginConfig.([]interface{})
		if ok {
			for _, aggregatorInterface := range aggregators {
				aggregator, ok := aggregatorInterface.(map[string]interface{})
				if ok {
					if pluginTypeWithID, ok := aggregator["type"]; ok {
						if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
							pluginType := getPluginType(pluginTypeWithIDStr)
							logger.Debug(contextImp.GetRuntimeContext(), "add aggregator", pluginType)
							err = loadAggregator(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, aggregator["detail"])
							if err != nil {
								return nil, err
							}
							contextImp.AddPlugin(pluginType)
							continue
						}
					}
				}
				return nil, fmt.Errorf("invalid aggregator type")
			}
		} else {
			return nil, fmt.Errorf("invalid aggregator type : %s, not json array", "aggregators")
		}
	}
	if err = logstoreC.PluginRunner.AddDefaultAggregatorIfEmpty(); err != nil {
		return nil, err
	}

	pluginConfig, flushersFound := plugins["flushers"]
	if flushersFound {
		flushers, ok := pluginConfig.([]interface{})
		if ok {
			for _, flusherInterface := range flushers {
				flusher, ok := flusherInterface.(map[string]interface{})
				if ok {
					if pluginTypeWithID, ok := flusher["type"]; ok {
						if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
							pluginType := getPluginType(pluginTypeWithIDStr)
							logger.Debug(contextImp.GetRuntimeContext(), "add flusher", pluginType)
							err = loadFlusher(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, flusher["detail"])
							if err != nil {
								return nil, err
							}
							contextImp.AddPlugin(pluginType)
							continue
						}
					}
				}
				return nil, fmt.Errorf("invalid flusher type")
			}
		} else {
			return nil, fmt.Errorf("invalid flusher type : %s, not json array", "flushers")
		}
	}
	if err = logstoreC.PluginRunner.AddDefaultFlusherIfEmpty(); err != nil {
		return nil, err
	}
	return logstoreC, nil
}