func()

in internal/pkg/server/agent.go [435:543]


func (a *Agent) configFromUnits(ctx context.Context) (*config.Config, error) {
	agentID := ""
	agentVersion := ""
	agentInfo := a.agent.AgentInfo()
	if agentInfo != nil {
		agentID = agentInfo.ID
		agentVersion = agentInfo.Version
	}
	expInput := a.inputUnit.Expected()
	expOutput := a.outputUnit.Expected()
	logLevel := expInput.LogLevel
	if expOutput.LogLevel > logLevel {
		logLevel = expOutput.LogLevel
	}

	// pass inputs from policy through go-ucfg in order to flatten keys
	// if inputCfg.Source.AsMap() is passed directly, any additional server.* settings will be missed
	var input map[string]interface{}
	inputsConfig, err := ucfg.NewFrom(expInput.Config.Source.AsMap(), config.DefaultOptions...)
	if err != nil {
		return nil, err
	}
	if err := inputsConfig.Unpack(&input, config.DefaultOptions...); err != nil {
		return nil, err
	}
	outMap := expOutput.Config.Source.AsMap()

	// elastic-agent should be setting bootstrap with config provided through enrollment flags
	if bootstrapCfg, ok := outMap["bootstrap"]; ok {
		// Check if an output check loop is running, cancel if it is.
		if a.outputCheckCanceller != nil {
			a.outputCheckCanceller()
			a.outputCheckCanceller = nil
		}

		bootstrap, ok := bootstrapCfg.(map[string]interface{})
		if !ok {
			return nil, fmt.Errorf("output bootstrap attribute is not an object, detected type: %T", bootstrapCfg)
		}
		delete(outMap, "bootstrap")
		injectMissingOutputAttributes(ctx, outMap, bootstrap)

		if err := a.esOutputCheck(ctx, outMap); err != nil {
			redactedOut, _ := toOutput(outMap)
			redactedOut = config.RedactOutput(&config.Config{Output: redactedOut})
			if errors.Is(err, es.ErrElasticVersionConflict) || errors.Is(err, ver.ErrUnsupportedVersion) {
				zerolog.Ctx(ctx).Error().Err(err).Interface("output", redactedOut).Msg("Elasticsearch version constraint failed for new output")
			} else if errors.Is(err, context.Canceled) {
				// ignore logging cancelation errors in the output check
			} else {
				zerolog.Ctx(ctx).Warn().Err(err).Interface("output", redactedOut).Msg("Failed Elasticsearch output configuration test, using bootstrap values.")

				// try to reload periodically
				outputCtx, canceller := context.WithCancel(ctx)
				a.outputCheckCanceller = canceller
				go a.esOutputCheckLoop(outputCtx, outputCheckLoopDelay, outMap)
			}
			outMap = bootstrap // outMap fails to connect, revert to bootstrap
		}
	}

	cfgData, err := ucfg.NewFrom(map[string]interface{}{
		"fleet": map[string]interface{}{
			"agent": map[string]interface{}{
				"id":      agentID,
				"version": agentVersion,
			},
		},
		"output": map[string]interface{}{
			"elasticsearch": outMap,
		},
		"inputs": []interface{}{
			input,
		},
		"logging": map[string]interface{}{
			"level": logLevel.String(),
		},
	})
	if err != nil {
		return nil, err
	}

	if expAPMCFG := expInput.APMConfig; expAPMCFG != nil {
		instrumentationCfg, err := apmConfigToInstrumentation(expAPMCFG)
		if err != nil {
			zerolog.Ctx(ctx).Warn().Err(err).Msg("Unable to parse expected APM config as instrumentation config")
		} else {
			obj := map[string]interface{}{
				"inputs": []interface{}{map[string]interface{}{
					"server": map[string]interface{}{
						"instrumentation": instrumentationCfg,
					},
				},
				}}
			err = cfgData.Merge(obj, config.MergeOptions...)
			if err != nil {
				zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to merge APM config into cfgData")
			}
		}

	}

	cliCfg := ucfg.MustNewFrom(a.cliCfg, config.DefaultOptions...)
	err = cliCfg.Merge(cfgData, config.DefaultOptions...)
	if err != nil {
		return nil, err
	}
	return config.FromConfig(cliCfg)
}