func()

in internal/pkg/server/agent.go [100:223]


func (a *Agent) Run(ctx context.Context) error {
	// ctx is cancelled when a SIGTERM or SIGINT is received.
	log := zerolog.Ctx(ctx)
	a.agent.RegisterDiagnosticHook("fleet-server config", "fleet-server's current configuration", "fleet-server.yml", "application/yml", func() []byte {
		if a.srv == nil {
			log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
			return nil
		}
		cfg := a.srv.GetConfig()
		if cfg == nil {
			log.Warn().Msg("Diagnostics hook failure config is nil.")
			return nil
		}
		cfg = cfg.Redact()
		p, err := yaml.Marshal(cfg)
		if err != nil {
			log.Error().Err(err).Msg("Diagnostics hook failure config unable to marshal yaml.")
			return nil
		}
		return p
	})
	a.agent.RegisterDiagnosticHook("fleet-server api tls diag", "fleet-server's API TLS config", "fleet-server-api-tls.txt", "text/plain", func() []byte {
		if a.srv == nil {
			log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
			return []byte(`Diagnostics hook failure fleet-server is nil`)
		}
		cfg := a.srv.GetConfig()
		if cfg == nil || len(cfg.Inputs) == 0 {
			log.Warn().Msg("Diagnostics hook failure config is nil.")
			return []byte(`Diagnostics hook failure config is nil`)
		}
		return cfg.Inputs[0].Server.TLS.DiagCerts()()
	})
	a.agent.RegisterDiagnosticHook("fleet-server output tls diag", "fleet-server's output TLS config", "fleet-server-output-tls.txt", "text/plain", func() []byte {
		if a.srv == nil {
			log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
			return []byte(`Diagnostics hook failure fleet-server is nil`)
		}
		cfg := a.srv.GetConfig()
		if cfg == nil {
			log.Warn().Msg("Diagnostics hook failure config is nil.")
			return []byte(`Diagnostics hook failure config is nil`)
		}
		return cfg.Output.Elasticsearch.TLS.DiagCerts()()
	})
	a.agent.RegisterOptionalDiagnosticHook("CONN", "fleet-server output request diag", "fleet-server output request trace diagnostics", "fleet-server-output-request.txt", "text/plain", func() []byte {
		if a.srv == nil {
			log.Warn().Msg("Diagnostics hook failure fleet-server is nil.")
			return []byte(`Diagnostics hook failure fleet-server is nil`)
		}
		cfg := a.srv.GetConfig()
		if cfg == nil {
			log.Warn().Msg("Diagnostics hook failure config is nil.")
			return []byte(`Diagnostics hook failure config is nil`)
		}
		ctx, cancel := context.WithTimeout(ctx, time.Second*30) // diag specific context, has a timeout  // TODO(michel-laterman): duration/timeout should be part of the diagnostics action from fleet-server (https://github.com/elastic/fleet-server/issues/3648) and the control protocol (https://github.com/elastic/elastic-agent-client/issues/113)
		defer cancel()
		return cfg.Output.Elasticsearch.DiagRequests(ctx)
	})

	// doneCh is used to track when agent wrapper run loop returns
	doneCh := make(chan struct{})
	go func() {
		defer close(doneCh)

		t := time.NewTicker(1 * time.Second)
		defer t.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case err := <-a.agent.Errors():
				if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) {
					log.Error().Err(err).Msg("Agent wrapper received error.")
				}
			case change := <-a.agent.UnitChanges():
				switch change.Type {
				case client.UnitChangedAdded:
					err := a.unitAdded(ctx, change.Unit)
					if err != nil {
						log.Error().Str("unit", change.Unit.ID()).Err(err)
						_ = change.Unit.UpdateState(client.UnitStateFailed, err.Error(), nil)
					}
				case client.UnitChangedModified:
					err := a.unitModified(ctx, change.Unit)
					if err != nil {
						log.Error().Str("unit", change.Unit.ID()).Err(err)
						_ = change.Unit.UpdateState(client.UnitStateFailed, err.Error(), nil)
					}
				case client.UnitChangedRemoved:
					a.unitRemoved(change.Unit)
				}
			case <-a.chReconfigure:
				err := a.reconfigure(ctx)
				if err != nil && !errors.Is(err, context.Canceled) {
					log.Error().Err(err).Msg("Error when reconfiguring from trigger")
				}
			case <-t.C:
				// Fleet Server is the only component that gets started by Elastic Agent without an Agent ID. We loop
				// here on interval waiting for the Elastic Agent to enroll so then the Agent ID is then set.
				agentInfo := a.agent.AgentInfo()
				if agentInfo != nil && agentInfo.ID != "" {
					// Agent ID is not set for the component.
					t.Stop()
					err := a.reconfigure(ctx)
					if err != nil && !errors.Is(err, context.Canceled) {
						log.Error().Err(err).Msg("Bootstrap error when reconfiguring")
					}
				}
			}
		}
	}()

	log.Info().Msg("starting communication connection back to Elastic Agent")
	err := a.agent.Start(ctx)
	if err != nil && !errors.Is(err, context.Canceled) {
		return err
	}

	<-ctx.Done() // wait for a termination signal
	<-doneCh     // wait for agent wrapper goroutine to terminate

	return nil
}