in internal/pkg/server/agent.go [100:223]
func (a *Agent) Run(ctx context.Context) error {
// ctx is cancelled when a SIGTERM or SIGINT is received.
log := zerolog.Ctx(ctx)
a.agent.RegisterDiagnosticHook("fleet-server config", "fleet-server's current configuration", "fleet-server.yml", "application/yml", func() []byte {
if a.srv == nil {
log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
return nil
}
cfg := a.srv.GetConfig()
if cfg == nil {
log.Warn().Msg("Diagnostics hook failure config is nil.")
return nil
}
cfg = cfg.Redact()
p, err := yaml.Marshal(cfg)
if err != nil {
log.Error().Err(err).Msg("Diagnostics hook failure config unable to marshal yaml.")
return nil
}
return p
})
a.agent.RegisterDiagnosticHook("fleet-server api tls diag", "fleet-server's API TLS config", "fleet-server-api-tls.txt", "text/plain", func() []byte {
if a.srv == nil {
log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
return []byte(`Diagnostics hook failure fleet-server is nil`)
}
cfg := a.srv.GetConfig()
if cfg == nil || len(cfg.Inputs) == 0 {
log.Warn().Msg("Diagnostics hook failure config is nil.")
return []byte(`Diagnostics hook failure config is nil`)
}
return cfg.Inputs[0].Server.TLS.DiagCerts()()
})
a.agent.RegisterDiagnosticHook("fleet-server output tls diag", "fleet-server's output TLS config", "fleet-server-output-tls.txt", "text/plain", func() []byte {
if a.srv == nil {
log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
return []byte(`Diagnostics hook failure fleet-server is nil`)
}
cfg := a.srv.GetConfig()
if cfg == nil {
log.Warn().Msg("Diagnostics hook failure config is nil.")
return []byte(`Diagnostics hook failure config is nil`)
}
return cfg.Output.Elasticsearch.TLS.DiagCerts()()
})
a.agent.RegisterOptionalDiagnosticHook("CONN", "fleet-server output request diag", "fleet-server output request trace diagnostics", "fleet-server-output-request.txt", "text/plain", func() []byte {
if a.srv == nil {
log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
return []byte(`Diagnostics hook failure fleet-server is nil`)
}
cfg := a.srv.GetConfig()
if cfg == nil {
log.Warn().Msg("Diagnostics hook failure config is nil.")
return []byte(`Diagnostics hook failure config is nil`)
}
ctx, cancel := context.WithTimeout(ctx, time.Second*30) // diag specific context, has a timeout // TODO(michel-laterman): duration/timeout should be part of the diagnostics action from fleet-server (https://github.com/elastic/fleet-server/issues/3648) and the control protocol (https://github.com/elastic/elastic-agent-client/issues/113)
defer cancel()
return cfg.Output.Elasticsearch.DiagRequests(ctx)
})
// doneCh is used to track when agent wrapper run loop returns
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case err := <-a.agent.Errors():
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) {
log.Error().Err(err).Msg("Agent wrapper received error.")
}
case change := <-a.agent.UnitChanges():
switch change.Type {
case client.UnitChangedAdded:
err := a.unitAdded(ctx, change.Unit)
if err != nil {
log.Error().Str("unit", change.Unit.ID()).Err(err)
_ = change.Unit.UpdateState(client.UnitStateFailed, err.Error(), nil)
}
case client.UnitChangedModified:
err := a.unitModified(ctx, change.Unit)
if err != nil {
log.Error().Str("unit", change.Unit.ID()).Err(err)
_ = change.Unit.UpdateState(client.UnitStateFailed, err.Error(), nil)
}
case client.UnitChangedRemoved:
a.unitRemoved(change.Unit)
}
case <-a.chReconfigure:
err := a.reconfigure(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error().Err(err).Msg("Error when reconfiguring from trigger")
}
case <-t.C:
// Fleet Server is the only component that gets started by Elastic Agent without an Agent ID. We loop
// here on interval waiting for the Elastic Agent to enroll so then the Agent ID is then set.
agentInfo := a.agent.AgentInfo()
if agentInfo != nil && agentInfo.ID != "" {
// Agent ID is not set for the component.
t.Stop()
err := a.reconfigure(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error().Err(err).Msg("Bootstrap error when reconfiguring")
}
}
}
}
}()
log.Info().Msg("starting communication connection back to Elastic Agent")
err := a.agent.Start(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
<-ctx.Done() // wait for a termination signal
<-doneCh // wait for agent wrapper goroutine to terminate
return nil
}