in pluginmanager/logstore_config.go [306:598]
func createLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, jsonStr string) (*LogstoreConfig, error) {
var err error
contextImp := &ContextImp{}
contextImp.InitContext(project, logstore, configName)
logstoreC := &LogstoreConfig{
ProjectName: project,
LogstoreName: logstore,
ConfigName: config.GetRealConfigName(configName),
ConfigNameWithSuffix: configName,
LogstoreKey: logstoreKey,
Context: contextImp,
configDetailHash: fmt.Sprintf("%x", md5.Sum([]byte(jsonStr))), //nolint:gosec
}
contextImp.logstoreC = logstoreC
var plugins = make(map[string]interface{})
if err = json.Unmarshal([]byte(jsonStr), &plugins); err != nil {
return nil, err
}
logstoreC.Version = fetchPluginVersion(plugins)
if logstoreC.PluginRunner, err = initPluginRunner(logstoreC); err != nil {
return nil, err
}
if lastConfigRunner, hasLastConfig := LastUnsendBuffer[configName]; hasLastConfig {
// Move unsent LogGroups from last config to new config.
logstoreC.PluginRunner.Merge(lastConfigRunner)
}
logstoreC.ContainerLabelSet = make(map[string]struct{})
logstoreC.EnvSet = make(map[string]struct{})
logstoreC.K8sLabelSet = make(map[string]struct{})
// add env and label set to logstore config
inputs, exists := plugins["inputs"]
if exists {
inputList, valid := inputs.([]interface{})
if valid {
for _, detail := range inputList {
cfg, valid := detail.(map[string]interface{})
if !valid {
continue
}
pluginTypeWithID, valid := cfg["type"]
if !valid {
continue
}
val, valid := pluginTypeWithID.(string)
if !valid {
continue
}
pluginType := getPluginType(val)
if pluginType == input.ServiceDockerStdoutPluginName || pluginType == input.MetricDocierFilePluginName {
configDetail, valid := cfg["detail"]
if !valid {
continue
}
detailMap, valid := configDetail.(map[string]interface{})
if !valid {
continue
}
for key, value := range detailMap {
lowerKey := strings.ToLower(key)
if strings.Contains(lowerKey, "include") || strings.Contains(lowerKey, "exclude") {
conditionMap, valid := value.(map[string]interface{})
if !valid {
continue
}
if strings.Contains(lowerKey, "k8slabel") {
for key := range conditionMap {
logstoreC.K8sLabelSet[key] = struct{}{}
}
} else if strings.Contains(lowerKey, "label") {
for key := range conditionMap {
logstoreC.ContainerLabelSet[key] = struct{}{}
}
}
if strings.Contains(lowerKey, "env") {
for key := range conditionMap {
logstoreC.EnvSet[key] = struct{}{}
}
}
}
if strings.Contains(lowerKey, "collectcontainersflag") {
collectContainersFlag, valid := value.(bool)
if !valid {
continue
}
logstoreC.CollectingContainersMeta = collectContainersFlag
} else if strings.Contains(lowerKey, "collectingcontainersmeta") {
collectingContainersMeta, valid := value.(bool)
if !valid {
continue
}
logstoreC.CollectingContainersMeta = collectingContainersMeta
}
}
}
}
}
}
logstoreC.GlobalConfig = &config.LoongcollectorGlobalConfig
// If plugins config has "global" field, then override the logstoreC.GlobalConfig
if pluginConfigInterface, flag := plugins["global"]; flag {
pluginConfig := &config.GlobalConfig{}
*pluginConfig = config.LoongcollectorGlobalConfig
if flag {
configJSONStr, err := json.Marshal(pluginConfigInterface) //nolint:govet
if err != nil {
return nil, err
}
err = json.Unmarshal(configJSONStr, &pluginConfig)
if err != nil {
return nil, err
}
pluginConfig.AppendingAllEnvMetaTag = false
if pluginConfigMap, ok := pluginConfigInterface.(map[string]interface{}); ok {
if _, ok := pluginConfigMap["AgentEnvMetaTagKey"]; !ok {
pluginConfig.AppendingAllEnvMetaTag = true
}
}
}
logstoreC.GlobalConfig = pluginConfig
if logstoreC.GlobalConfig.PipelineMetaTagKey == nil {
logstoreC.GlobalConfig.PipelineMetaTagKey = make(map[string]string)
}
if logstoreC.GlobalConfig.AgentEnvMetaTagKey == nil {
logstoreC.GlobalConfig.AgentEnvMetaTagKey = make(map[string]string)
}
logger.Debug(contextImp.GetRuntimeContext(), "load plugin config", *logstoreC.GlobalConfig)
}
logQueueSize := logstoreC.GlobalConfig.DefaultLogQueueSize
// Because the transferred data of the file MixProcessMode is quite large, we have to limit queue size to control memory usage here.
if checkMixProcessMode(plugins) == file {
logger.Infof(contextImp.GetRuntimeContext(), "no inputs in config %v, maybe file input, limit queue size", configName)
logQueueSize = 10
}
logGroupSize := logstoreC.GlobalConfig.DefaultLogGroupQueueSize
if err = logstoreC.PluginRunner.Init(logQueueSize, logGroupSize); err != nil {
return nil, err
}
// extensions should be initialized first
pluginConfig, ok := plugins["extensions"]
if ok {
extensions, extensionsFound := pluginConfig.([]interface{})
if !extensionsFound {
return nil, fmt.Errorf("invalid extension type: %s, not json array", "extensions")
}
for _, extensionInterface := range extensions {
extension, ok := extensionInterface.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid extension type")
}
if pluginTypeWithID, ok := extension["type"]; ok {
pluginTypeWithIDStr, ok := pluginTypeWithID.(string)
if !ok {
return nil, fmt.Errorf("invalid extension type")
}
pluginType := getPluginType(pluginTypeWithIDStr)
logger.Debug(contextImp.GetRuntimeContext(), "add extension", pluginType)
err = loadExtension(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, extension["detail"])
if err != nil {
return nil, err
}
contextImp.AddPlugin(pluginType)
}
}
}
pluginConfig, inputsFound := plugins["inputs"]
if inputsFound {
inputs, ok := pluginConfig.([]interface{})
if ok {
for _, inputInterface := range inputs {
input, ok := inputInterface.(map[string]interface{})
if ok {
if pluginTypeWithID, ok := input["type"]; ok {
if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
pluginType := getPluginType(pluginTypeWithIDStr)
if _, isMetricInput := pipeline.MetricInputs[pluginType]; isMetricInput {
// Load MetricInput plugin defined in pipeline.MetricInputs
// pipeline.MetricInputs will be renamed in a future version
err = loadMetric(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, input["detail"])
} else if _, isServiceInput := pipeline.ServiceInputs[pluginType]; isServiceInput {
// Load ServiceInput plugin defined in pipeline.ServiceInputs
err = loadService(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, input["detail"])
}
if err != nil {
return nil, err
}
contextImp.AddPlugin(pluginType)
continue
}
}
}
return nil, fmt.Errorf("invalid input type")
}
} else {
return nil, fmt.Errorf("invalid inputs type : %s, not json array", "inputs")
}
}
pluginConfig, processorsFound := plugins["processors"]
if processorsFound {
processors, ok := pluginConfig.([]interface{})
if ok {
for i, processorInterface := range processors {
processor, ok := processorInterface.(map[string]interface{})
if ok {
if pluginTypeWithID, ok := processor["type"]; ok {
if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
pluginType := getPluginType(pluginTypeWithIDStr)
logger.Debug(contextImp.GetRuntimeContext(), "add processor", pluginType)
err = loadProcessor(logstoreC.genPluginMeta(pluginTypeWithIDStr), i, logstoreC, processor["detail"])
if err != nil {
return nil, err
}
contextImp.AddPlugin(pluginType)
continue
}
}
}
return nil, fmt.Errorf("invalid processor type")
}
} else {
return nil, fmt.Errorf("invalid processors type : %s, not json array", "processors")
}
}
pluginConfig, aggregatorsFound := plugins["aggregators"]
if aggregatorsFound {
aggregators, ok := pluginConfig.([]interface{})
if ok {
for _, aggregatorInterface := range aggregators {
aggregator, ok := aggregatorInterface.(map[string]interface{})
if ok {
if pluginTypeWithID, ok := aggregator["type"]; ok {
if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
pluginType := getPluginType(pluginTypeWithIDStr)
logger.Debug(contextImp.GetRuntimeContext(), "add aggregator", pluginType)
err = loadAggregator(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, aggregator["detail"])
if err != nil {
return nil, err
}
contextImp.AddPlugin(pluginType)
continue
}
}
}
return nil, fmt.Errorf("invalid aggregator type")
}
} else {
return nil, fmt.Errorf("invalid aggregator type : %s, not json array", "aggregators")
}
}
if err = logstoreC.PluginRunner.AddDefaultAggregatorIfEmpty(); err != nil {
return nil, err
}
pluginConfig, flushersFound := plugins["flushers"]
if flushersFound {
flushers, ok := pluginConfig.([]interface{})
if ok {
for _, flusherInterface := range flushers {
flusher, ok := flusherInterface.(map[string]interface{})
if ok {
if pluginTypeWithID, ok := flusher["type"]; ok {
if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
pluginType := getPluginType(pluginTypeWithIDStr)
logger.Debug(contextImp.GetRuntimeContext(), "add flusher", pluginType)
err = loadFlusher(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, flusher["detail"])
if err != nil {
return nil, err
}
contextImp.AddPlugin(pluginType)
continue
}
}
}
return nil, fmt.Errorf("invalid flusher type")
}
} else {
return nil, fmt.Errorf("invalid flusher type : %s, not json array", "flushers")
}
}
if err = logstoreC.PluginRunner.AddDefaultFlusherIfEmpty(); err != nil {
return nil, err
}
return logstoreC, nil
}