in internal/pkg/server/agent.go [435:543]
func (a *Agent) configFromUnits(ctx context.Context) (*config.Config, error) {
agentID := ""
agentVersion := ""
agentInfo := a.agent.AgentInfo()
if agentInfo != nil {
agentID = agentInfo.ID
agentVersion = agentInfo.Version
}
expInput := a.inputUnit.Expected()
expOutput := a.outputUnit.Expected()
logLevel := expInput.LogLevel
if expOutput.LogLevel > logLevel {
logLevel = expOutput.LogLevel
}
// pass inputs from policy through go-ucfg in order to flatten keys
// if inputCfg.Source.AsMap() is passed directly, any additional server.* settings will be missed
var input map[string]interface{}
inputsConfig, err := ucfg.NewFrom(expInput.Config.Source.AsMap(), config.DefaultOptions...)
if err != nil {
return nil, err
}
if err := inputsConfig.Unpack(&input, config.DefaultOptions...); err != nil {
return nil, err
}
outMap := expOutput.Config.Source.AsMap()
// elastic-agent should be setting bootstrap with config provided through enrollment flags
if bootstrapCfg, ok := outMap["bootstrap"]; ok {
// Check if an output check loop is running, cancel if it is.
if a.outputCheckCanceller != nil {
a.outputCheckCanceller()
a.outputCheckCanceller = nil
}
bootstrap, ok := bootstrapCfg.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("output bootstrap attribute is not an object, detected type: %T", bootstrapCfg)
}
delete(outMap, "bootstrap")
injectMissingOutputAttributes(ctx, outMap, bootstrap)
if err := a.esOutputCheck(ctx, outMap); err != nil {
redactedOut, _ := toOutput(outMap)
redactedOut = config.RedactOutput(&config.Config{Output: redactedOut})
if errors.Is(err, es.ErrElasticVersionConflict) || errors.Is(err, ver.ErrUnsupportedVersion) {
zerolog.Ctx(ctx).Error().Err(err).Interface("output", redactedOut).Msg("Elasticsearch version constraint failed for new output")
} else if errors.Is(err, context.Canceled) {
// ignore logging cancelation errors in the output check
} else {
zerolog.Ctx(ctx).Warn().Err(err).Interface("output", redactedOut).Msg("Failed Elasticsearch output configuration test, using bootstrap values.")
// try to reload periodically
outputCtx, canceller := context.WithCancel(ctx)
a.outputCheckCanceller = canceller
go a.esOutputCheckLoop(outputCtx, outputCheckLoopDelay, outMap)
}
outMap = bootstrap // outMap fails to connect, revert to bootstrap
}
}
cfgData, err := ucfg.NewFrom(map[string]interface{}{
"fleet": map[string]interface{}{
"agent": map[string]interface{}{
"id": agentID,
"version": agentVersion,
},
},
"output": map[string]interface{}{
"elasticsearch": outMap,
},
"inputs": []interface{}{
input,
},
"logging": map[string]interface{}{
"level": logLevel.String(),
},
})
if err != nil {
return nil, err
}
if expAPMCFG := expInput.APMConfig; expAPMCFG != nil {
instrumentationCfg, err := apmConfigToInstrumentation(expAPMCFG)
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Unable to parse expected APM config as instrumentation config")
} else {
obj := map[string]interface{}{
"inputs": []interface{}{map[string]interface{}{
"server": map[string]interface{}{
"instrumentation": instrumentationCfg,
},
},
}}
err = cfgData.Merge(obj, config.MergeOptions...)
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to merge APM config into cfgData")
}
}
}
cliCfg := ucfg.MustNewFrom(a.cliCfg, config.DefaultOptions...)
err = cliCfg.Merge(cfgData, config.DefaultOptions...)
if err != nil {
return nil, err
}
return config.FromConfig(cliCfg)
}