internal/pkg/server/agent.go (617 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package server
import (
"context"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/rs/zerolog"
"github.com/elastic/fleet-server/v7/internal/pkg/build"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/reload"
"github.com/elastic/fleet-server/v7/internal/pkg/sleep"
"github.com/elastic/fleet-server/v7/internal/pkg/state"
"github.com/elastic/fleet-server/v7/internal/pkg/ver"
"gopkg.in/yaml.v3"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/go-ucfg"
)
const (
kAgentModeRestartLoopDelay = 2 * time.Second
kFleetServer = "fleet-server"
kElasticsearch = "elasticsearch"
kStopped = "Stopped"
verifyNone = "none"
// NOTE: Do we want to try to make this configurable in the future?
// It may need different handling as we would want it to be part of the policy definition so a user
// can specify in Kibana, but it only applies to agent mode.
outputCheckLoopDelay = time.Minute
)
type clientUnit interface {
Expected() client.Expected
UpdateState(state client.UnitState, message string, payload map[string]interface{}) error
}
// Agent is a fleet-server that runs under the elastic-agent.
// An Agent instance will retrieve connection information from the passed reader (normally stdin).
// Agent uses client.StateInterface to gather config data and manage its lifecylce.
type Agent struct {
cliCfg *ucfg.Config
bi build.Info
reloadables []reload.Reloadable
agent client.V2
outputUnit clientUnit
inputUnit clientUnit
srv *Fleet
srvCtx context.Context
srvCanceller context.CancelFunc
srvDone chan struct{}
outputCheckCanceller context.CancelFunc
chReconfigure chan struct{}
}
// NewAgent returns an Agent that will gather connection information from the passed reader.
func NewAgent(cliCfg *ucfg.Config, reader io.Reader, bi build.Info, reloadables ...reload.Reloadable) (*Agent, error) {
var err error
a := &Agent{
cliCfg: cliCfg,
bi: bi,
reloadables: reloadables,
chReconfigure: make(chan struct{}, 1),
}
a.agent, _, err = client.NewV2FromReader(reader, client.VersionInfo{
Name: kFleetServer,
BuildHash: bi.Commit,
Meta: map[string]string{
"commit": bi.Commit,
"build_time": bi.BuildTime.String(),
},
})
if err != nil {
return nil, err
}
return a, nil
}
// Run starts a Server instance using config from the configured client.
func (a *Agent) Run(ctx context.Context) error {
// ctx is cancelled when a SIGTERM or SIGINT is received.
log := zerolog.Ctx(ctx)
a.agent.RegisterDiagnosticHook("fleet-server config", "fleet-server's current configuration", "fleet-server.yml", "application/yml", func() []byte {
if a.srv == nil {
log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
return nil
}
cfg := a.srv.GetConfig()
if cfg == nil {
log.Warn().Msg("Diagnostics hook failure config is nil.")
return nil
}
cfg = cfg.Redact()
p, err := yaml.Marshal(cfg)
if err != nil {
log.Error().Err(err).Msg("Diagnostics hook failure config unable to marshal yaml.")
return nil
}
return p
})
a.agent.RegisterDiagnosticHook("fleet-server api tls diag", "fleet-server's API TLS config", "fleet-server-api-tls.txt", "text/plain", func() []byte {
if a.srv == nil {
log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
return []byte(`Diagnostics hook failure fleet-server is nil`)
}
cfg := a.srv.GetConfig()
if cfg == nil || len(cfg.Inputs) == 0 {
log.Warn().Msg("Diagnostics hook failure config is nil.")
return []byte(`Diagnostics hook failure config is nil`)
}
return cfg.Inputs[0].Server.TLS.DiagCerts()()
})
a.agent.RegisterDiagnosticHook("fleet-server output tls diag", "fleet-server's output TLS config", "fleet-server-output-tls.txt", "text/plain", func() []byte {
if a.srv == nil {
log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
return []byte(`Diagnostics hook failure fleet-server is nil`)
}
cfg := a.srv.GetConfig()
if cfg == nil {
log.Warn().Msg("Diagnostics hook failure config is nil.")
return []byte(`Diagnostics hook failure config is nil`)
}
return cfg.Output.Elasticsearch.TLS.DiagCerts()()
})
a.agent.RegisterOptionalDiagnosticHook("CONN", "fleet-server output request diag", "fleet-server output request trace diagnostics", "fleet-server-output-request.txt", "text/plain", func() []byte {
if a.srv == nil {
log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
return []byte(`Diagnostics hook failure fleet-server is nil`)
}
cfg := a.srv.GetConfig()
if cfg == nil {
log.Warn().Msg("Diagnostics hook failure config is nil.")
return []byte(`Diagnostics hook failure config is nil`)
}
ctx, cancel := context.WithTimeout(ctx, time.Second*30) // diag specific context, has a timeout // TODO(michel-laterman): duration/timeout should be part of the diagnostics action from fleet-server (https://github.com/elastic/fleet-server/issues/3648) and the control protocol (https://github.com/elastic/elastic-agent-client/issues/113)
defer cancel()
return cfg.Output.Elasticsearch.DiagRequests(ctx)
})
// doneCh is used to track when agent wrapper run loop returns
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case err := <-a.agent.Errors():
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) {
log.Error().Err(err).Msg("Agent wrapper received error.")
}
case change := <-a.agent.UnitChanges():
switch change.Type {
case client.UnitChangedAdded:
err := a.unitAdded(ctx, change.Unit)
if err != nil {
log.Error().Str("unit", change.Unit.ID()).Err(err)
_ = change.Unit.UpdateState(client.UnitStateFailed, err.Error(), nil)
}
case client.UnitChangedModified:
err := a.unitModified(ctx, change.Unit)
if err != nil {
log.Error().Str("unit", change.Unit.ID()).Err(err)
_ = change.Unit.UpdateState(client.UnitStateFailed, err.Error(), nil)
}
case client.UnitChangedRemoved:
a.unitRemoved(change.Unit)
}
case <-a.chReconfigure:
err := a.reconfigure(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error().Err(err).Msg("Error when reconfiguring from trigger")
}
case <-t.C:
// Fleet Server is the only component that gets started by Elastic Agent without an Agent ID. We loop
// here on interval waiting for the Elastic Agent to enroll so then the Agent ID is then set.
agentInfo := a.agent.AgentInfo()
if agentInfo != nil && agentInfo.ID != "" {
// Agent ID is not set for the component.
t.Stop()
err := a.reconfigure(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error().Err(err).Msg("Bootstrap error when reconfiguring")
}
}
}
}
}()
log.Info().Msg("starting communication connection back to Elastic Agent")
err := a.agent.Start(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
<-ctx.Done() // wait for a termination signal
<-doneCh // wait for agent wrapper goroutine to terminate
return nil
}
// UpdateState updates the state of the message and payload.
func (a *Agent) UpdateState(state client.UnitState, message string, payload map[string]interface{}) error {
if a.inputUnit != nil {
_ = a.inputUnit.UpdateState(state, message, payload)
}
if a.outputUnit != nil {
_ = a.outputUnit.UpdateState(state, message, payload)
}
return nil
}
func (a *Agent) unitAdded(ctx context.Context, unit *client.Unit) error {
if unit.Type() == client.UnitTypeInput {
exp := unit.Expected()
if exp.Config.Type != kFleetServer {
// not support input type
_ = unit.UpdateState(client.UnitStateFailed, fmt.Sprintf("%s is an unsupported input type", exp.Config.Type), nil)
return nil
}
if a.inputUnit != nil {
// original input unit is being stopped; swapping in this unit as the new input unit
_ = a.inputUnit.UpdateState(client.UnitStateStopped, kStopped, nil)
}
a.inputUnit = unit
if a.outputUnit == nil {
// waiting for output unit to really start Fleet Server
_ = unit.UpdateState(client.UnitStateStarting, "waiting for output unit", nil)
return nil
}
return a.start(ctx)
}
if unit.Type() == client.UnitTypeOutput {
exp := unit.Expected()
if exp.Config.Type != kElasticsearch {
// not support output type
_ = unit.UpdateState(client.UnitStateFailed, fmt.Sprintf("%s is an unsupported output type", exp.Config.Type), nil)
return nil
}
if a.outputUnit != nil {
// original output unit is being stopped; swapping in this unit as the new output unit
_ = a.outputUnit.UpdateState(client.UnitStateStopped, kStopped, nil)
}
a.outputUnit = unit
if a.inputUnit == nil {
// waiting for input unit to really start Fleet Server
_ = unit.UpdateState(client.UnitStateStarting, "waiting for input unit", nil)
return nil
}
return a.start(ctx)
}
return fmt.Errorf("unknown unit type %v", unit.Type())
}
func (a *Agent) unitModified(ctx context.Context, unit *client.Unit) error {
exp := unit.Expected()
if unit.Type() == client.UnitTypeInput {
if a.inputUnit != unit {
// not our input unit; would have been marked failed in unitAdded; do nothing
return nil
}
if exp.State == client.UnitStateHealthy {
if a.outputUnit == nil {
// still no output unit; would have been marked starting already; do nothing
return nil
}
// configuration modified (should still be running)
return a.reconfigure(ctx)
} else if exp.State == client.UnitStateStopped {
// unit should be stopped
a.stop()
return nil
}
return fmt.Errorf("unknown unit state %v", exp.State)
}
if unit.Type() == client.UnitTypeOutput {
if a.outputUnit != unit {
// not our output unit; would have been marked failed in unitAdded; do nothing
return nil
}
if exp.State == client.UnitStateHealthy {
if a.inputUnit == nil {
// still no input unit; would have been marked starting already; do nothing
return nil
}
// configuration modified (should still be running)
return a.reconfigure(ctx)
} else if exp.State == client.UnitStateStopped {
// unit should be stopped
a.stop()
return nil
}
return fmt.Errorf("unknown unit state %v", exp.State)
}
return fmt.Errorf("unknown unit type %v", unit.Type())
}
func (a *Agent) unitRemoved(unit *client.Unit) {
stop := false
if a.inputUnit == unit || a.outputUnit == unit {
stop = true
}
if stop {
a.stop()
}
if a.inputUnit == unit {
a.inputUnit = nil
}
if a.outputUnit == unit {
a.outputUnit = nil
}
}
func (a *Agent) start(ctx context.Context) error {
if a.srv != nil {
return a.reconfigure(ctx)
}
cfg, err := a.configFromUnits(ctx)
if err != nil {
return err
}
// reload the generic reloadables
for _, r := range a.reloadables {
err = r.Reload(ctx, cfg)
if err != nil {
return err
}
}
srvDone := make(chan struct{})
srvCtx, srvCanceller := context.WithCancel(ctx)
srv, err := NewFleet(a.bi, state.NewChained(state.NewLog(zerolog.Ctx(ctx)), a), false)
if err != nil {
close(srvDone)
srvCanceller()
return err
}
go func() {
defer close(srvDone)
for {
err := srv.Run(srvCtx, cfg)
if err == nil || errors.Is(err, context.Canceled) {
return
}
// sleep some before calling Run again
_ = sleep.WithContext(srvCtx, kAgentModeRestartLoopDelay)
}
}()
a.srv = srv
a.srvCtx = srvCtx
a.srvCanceller = srvCanceller
a.srvDone = srvDone
return nil
}
func (a *Agent) reconfigure(ctx context.Context) error {
if a.srv == nil {
return a.start(ctx)
}
cfg, err := a.configFromUnits(ctx)
if err != nil {
return err
}
// reload the generic reloadables
// Currently logger is the only reloadable
for _, r := range a.reloadables {
err = r.Reload(ctx, cfg)
if err != nil {
return err
}
}
return a.srv.Reload(ctx, cfg)
}
func (a *Agent) stop() {
// stop is called when expected config state indicates an input or output should stop
if a.srvCanceller == nil {
return
}
if a.outputCheckCanceller != nil {
a.outputCheckCanceller()
a.outputCheckCanceller = nil
}
canceller := a.srvCanceller
a.srvCanceller = nil
a.srvCtx = nil
a.srv = nil
canceller()
<-a.srvDone // wait for srv.Run loop to terminate either because root-context received a signal, or stop has been called
a.srvDone = nil
if a.inputUnit != nil {
_ = a.inputUnit.UpdateState(client.UnitStateStopped, kStopped, nil)
}
if a.outputUnit != nil {
_ = a.outputUnit.UpdateState(client.UnitStateStopped, kStopped, nil)
}
}
// configFromUnits takes both inputUnit and outputUnit and creates a single configuration just like fleet server was
// being started from a configuration file.
func (a *Agent) configFromUnits(ctx context.Context) (*config.Config, error) {
agentID := ""
agentVersion := ""
agentInfo := a.agent.AgentInfo()
if agentInfo != nil {
agentID = agentInfo.ID
agentVersion = agentInfo.Version
}
expInput := a.inputUnit.Expected()
expOutput := a.outputUnit.Expected()
logLevel := expInput.LogLevel
if expOutput.LogLevel > logLevel {
logLevel = expOutput.LogLevel
}
// pass inputs from policy through go-ucfg in order to flatten keys
// if inputCfg.Source.AsMap() is passed directly, any additional server.* settings will be missed
var input map[string]interface{}
inputsConfig, err := ucfg.NewFrom(expInput.Config.Source.AsMap(), config.DefaultOptions...)
if err != nil {
return nil, err
}
if err := inputsConfig.Unpack(&input, config.DefaultOptions...); err != nil {
return nil, err
}
outMap := expOutput.Config.Source.AsMap()
// elastic-agent should be setting bootstrap with config provided through enrollment flags
if bootstrapCfg, ok := outMap["bootstrap"]; ok {
// Check if an output check loop is running, cancel if it is.
if a.outputCheckCanceller != nil {
a.outputCheckCanceller()
a.outputCheckCanceller = nil
}
bootstrap, ok := bootstrapCfg.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("output bootstrap attribute is not an object, detected type: %T", bootstrapCfg)
}
delete(outMap, "bootstrap")
injectMissingOutputAttributes(ctx, outMap, bootstrap)
if err := a.esOutputCheck(ctx, outMap); err != nil {
redactedOut, _ := toOutput(outMap)
redactedOut = config.RedactOutput(&config.Config{Output: redactedOut})
if errors.Is(err, es.ErrElasticVersionConflict) || errors.Is(err, ver.ErrUnsupportedVersion) {
zerolog.Ctx(ctx).Error().Err(err).Interface("output", redactedOut).Msg("Elasticsearch version constraint failed for new output")
} else if errors.Is(err, context.Canceled) {
// ignore logging cancelation errors in the output check
} else {
zerolog.Ctx(ctx).Warn().Err(err).Interface("output", redactedOut).Msg("Failed Elasticsearch output configuration test, using bootstrap values.")
// try to reload periodically
outputCtx, canceller := context.WithCancel(ctx)
a.outputCheckCanceller = canceller
go a.esOutputCheckLoop(outputCtx, outputCheckLoopDelay, outMap)
}
outMap = bootstrap // outMap fails to connect, revert to bootstrap
}
}
cfgData, err := ucfg.NewFrom(map[string]interface{}{
"fleet": map[string]interface{}{
"agent": map[string]interface{}{
"id": agentID,
"version": agentVersion,
},
},
"output": map[string]interface{}{
"elasticsearch": outMap,
},
"inputs": []interface{}{
input,
},
"logging": map[string]interface{}{
"level": logLevel.String(),
},
})
if err != nil {
return nil, err
}
if expAPMCFG := expInput.APMConfig; expAPMCFG != nil {
instrumentationCfg, err := apmConfigToInstrumentation(expAPMCFG)
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Unable to parse expected APM config as instrumentation config")
} else {
obj := map[string]interface{}{
"inputs": []interface{}{map[string]interface{}{
"server": map[string]interface{}{
"instrumentation": instrumentationCfg,
},
},
}}
err = cfgData.Merge(obj, config.MergeOptions...)
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to merge APM config into cfgData")
}
}
}
cliCfg := ucfg.MustNewFrom(a.cliCfg, config.DefaultOptions...)
err = cliCfg.Merge(cfgData, config.DefaultOptions...)
if err != nil {
return nil, err
}
return config.FromConfig(cliCfg)
}
// apmConfigToInstrumentation transforms the passed APMConfig into the Instrumentation config that is used by fleet-server.
func apmConfigToInstrumentation(src *proto.APMConfig) (config.Instrumentation, error) {
if apmest := src.GetElastic(); apmest != nil {
apmTLS := apmest.GetTls()
iTLS := config.InstrumentationTLS{
SkipVerify: apmTLS.GetSkipVerify(),
ServerCertificate: apmTLS.GetServerCert(),
ServerCA: apmTLS.GetServerCa(),
}
cfg := config.Instrumentation{
Enabled: true,
TLS: iTLS,
Environment: apmest.GetEnvironment(),
APIKey: apmest.GetApiKey(),
SecretToken: apmest.GetSecretToken(),
Hosts: apmest.GetHosts(),
GlobalLabels: apmest.GetGlobalLabels(),
}
if apmest.SamplingRate != nil {
// set the sampling rate in config
cfg.TransactionSampleRate = strconv.FormatFloat(float64(*apmest.SamplingRate), 'f', -1, 32)
}
return cfg, nil
}
return config.Instrumentation{}, fmt.Errorf("unable to transform APMConfig to instrumentation")
}
// injectMissingOutputAttributes will inject an explicit set of keys that may be present in bootstrap into outMap.
// If outmap has a certificate_authorities or a fingerprint, verification_mode: none will not be injected if it is part of bootstrap.
// Note that we avoiding a more generic injection here (iterating over all keys in bootstrap recursively) in order to avoid injecting any unnecessary/deprecated attributes.
func injectMissingOutputAttributes(ctx context.Context, outMap, bootstrap map[string]interface{}) {
bootstrapKeys := []string{
"protocol",
"hosts",
"path",
"service_token",
"service_token_path",
"headers",
"proxy_url",
"proxy_disable",
"proxy_headers",
}
// keys that will appear under the "ssl" key
bootstrapSSLKeys := []string{
"verification_mode",
"certificate_authorities",
"ca_trusted_fingerprint",
"certificate",
"key",
}
injectKeys(bootstrapKeys, outMap, bootstrap)
// flags used to delete verification_mode: none if it is part of bootstrap and injected when output provides a CA of some sort.
outputSSLUsesCA := false
injectVerificationNone := false
// handle nested structs in bootstrap, currently we just support some ssl config
var bootstrapSSL map[string]interface{}
if mp, ok := bootstrap["ssl"]; ok {
bootstrapSSL, ok = mp.(map[string]interface{})
if !ok {
zerolog.Ctx(ctx).Warn().Interface("ssl_attribute", mp).Msg("Bootstrap ssl attribute is not an object.")
// ssl is not a map
// if bootstrap is used as output this will cause a parsing issue and fail later
return
}
if v, ok := bootstrapSSL["verification_mode"]; ok {
if s, ok := v.(string); ok && s == verifyNone {
injectVerificationNone = true
}
}
} else {
// bootstrap has no ssl attributes
return
}
outputSSL := map[string]interface{}{}
if mp, ok := outMap["ssl"]; ok {
outputSSL, ok = mp.(map[string]interface{})
if !ok {
zerolog.Ctx(ctx).Warn().Interface("ssl_attribute", mp).Msg("Policy ssl attribute is not an object.")
// output.ssl is not a map
// this will fail to parse later
return
}
outputSSLUsesCA = checkForCA(outputSSL)
}
injectKeys(bootstrapSSLKeys, outputSSL, bootstrapSSL)
if outputSSLUsesCA && injectVerificationNone {
delete(outputSSL, "verification_mode")
}
outMap["ssl"] = outputSSL
}
// injectKeys will inject any key in the passed list that exists in src but is missing from dst.
func injectKeys(keys []string, dst, src map[string]interface{}) {
for _, key := range keys {
// dst contains the key
if _, ok := dst[key]; ok {
continue
}
// src does not contain the key
if _, ok := src[key]; !ok {
continue
}
dst[key] = src[key]
}
}
// checkForCA checks to see if the passed cfg contains a certificate_authorities list with one item or a non-empty ca_trusted_fingerprint value.
func checkForCA(cfg map[string]interface{}) bool {
// if the cfg contains verificaton_mode none return false
if tmp, ok := cfg["verification_mode"]; ok {
if verificationMode, ok := tmp.(string); ok && verificationMode == verifyNone {
return false
}
}
if tmp, ok := cfg["certificate_authorities"]; ok {
if cas, ok := tmp.([]interface{}); ok && len(cas) > 0 {
return true
}
}
if tmp, ok := cfg["ca_trusted_fingerprint"]; ok {
if fingerprint, ok := tmp.(string); ok && fingerprint != "" {
return true
}
}
return false
}
func toOutput(data map[string]interface{}) (config.Output, error) {
var esOut config.Elasticsearch
temp, err := ucfg.NewFrom(data, config.DefaultOptions...)
if err != nil {
return config.Output{}, err
}
if err := temp.Unpack(&esOut, config.DefaultOptions...); err != nil {
return config.Output{}, err
}
const httpsSchema = "https"
isHTTPS := false
for _, host := range esOut.Hosts {
if strings.HasPrefix(strings.ToLower(host), httpsSchema) {
isHTTPS = true
break
}
}
if isHTTPS {
esOut.Protocol = httpsSchema
}
return config.Output{
Elasticsearch: esOut,
}, nil
}
func (a *Agent) esOutputCheck(ctx context.Context, cfg map[string]interface{}) error {
outCfg, err := toOutput(cfg)
if err != nil {
return fmt.Errorf("unable to convert map into output object: %w", err)
}
cli, err := es.NewClient(ctx,
&config.Config{
Output: outCfg,
},
false,
elasticsearchOptions(false, a.bi)..., // disable instrumentation for output config test
)
if err != nil {
return err
}
_, err = ver.CheckCompatibility(ctx, cli, a.bi.Version)
return err
}
// esOutputCheckLoop will periodically retest the passed (output) config and signal chReconfigure if it succeeds then return.
// If the context ic canceled, or an ErrElasticVersionConflict is returned (by the test) it will return
func (a *Agent) esOutputCheckLoop(ctx context.Context, delay time.Duration, cfg map[string]interface{}) {
for {
if err := sleep.WithContext(ctx, delay); err != nil {
zerolog.Ctx(ctx).Debug().Msg("Async output check context cancelled")
return
}
err := a.esOutputCheck(ctx, cfg)
if err == nil {
zerolog.Ctx(ctx).Debug().Msg("Async output check successful")
a.chReconfigure <- struct{}{}
return
}
if errors.Is(err, context.Canceled) {
zerolog.Ctx(ctx).Debug().Msg("Async output check context cancelled")
return
}
// connected to invalid ES version
outCfg, _ := toOutput(cfg)
if errors.Is(err, es.ErrElasticVersionConflict) || errors.Is(err, ver.ErrUnsupportedVersion) {
zerolog.Ctx(ctx).Error().Err(err).Interface("output", config.RedactOutput(&config.Config{Output: outCfg})).Msg("Elasticsearch version constraint failed for new output")
return
}
zerolog.Ctx(ctx).Debug().Err(err).Interface("output", config.RedactOutput(&config.Config{Output: outCfg})).Msgf("Async output check failed, will retry after %v", delay)
}
}