func waitForFleetServer()

in internal/pkg/agent/cmd/enroll_cmd.go [747:874]


func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessState, log *logger.Logger, timeout time.Duration) (string, error) {
	if timeout == 0 {
		timeout = 2 * time.Minute
	}
	if timeout > 0 {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, timeout)
		defer cancel()
	}
	maxBackoff := timeout
	if maxBackoff <= 0 {
		// indefinite timeout
		maxBackoff = 10 * time.Minute
	}

	resChan := make(chan waitResult)
	innerCtx, innerCancel := context.WithCancel(context.Background())
	defer innerCancel()
	go func() {
		msg := ""
		msgCount := 0
		backExp := expBackoffWithContext(innerCtx, 1*time.Second, maxBackoff)

		for {
			// if the timeout is reached, no response was sent on `res`, therefore
			// send an error
			if !backExp.Wait() {
				resChan <- waitResult{err: fmt.Errorf(
					"timed out waiting for Fleet Server to start after %s",
					timeout)}
			}

			state, err := getDaemonState(innerCtx)
			if errors.Is(err, context.Canceled) {
				resChan <- waitResult{err: err}
				return
			}
			if err != nil {
				log.Debugf("%s: %s", waitingForAgent, err)
				if msg != waitingForAgent {
					msg = waitingForAgent
					msgCount = 0
					log.Info(waitingForAgent)
				} else {
					msgCount++
					if msgCount > 5 {
						msgCount = 0
						log.Infof("%s: %s", waitingForAgent, err)
					}
				}
				continue
			}
			unit := getCompUnitFromStatus(state, "fleet-server")
			if unit == nil {
				err = errors.New("no fleet-server application running")
				log.Debugf("%s: %s", waitingForFleetServer, err)
				if msg != waitingForFleetServer {
					msg = waitingForFleetServer
					msgCount = 0
					log.Info(waitingForFleetServer)
				} else {
					msgCount++
					if msgCount > 5 {
						msgCount = 0
						log.Infof("%s: %s", waitingForFleetServer, err)
					}
				}
				continue
			}
			log.Debugf("%s: %s - %s", waitingForFleetServer, unit.State, unit.Message)
			if unit.State == client.Degraded || unit.State == client.Healthy {
				// app has started and is running
				if unit.Message != "" {
					log.Infof("Fleet Server - %s", unit.Message)
				}
				// extract the enrollment token from the status payload
				token := ""
				if unit.Payload != nil {
					if enrollToken, ok := unit.Payload["enrollment_token"]; ok {
						if tokenStr, ok := enrollToken.(string); ok {
							token = tokenStr
						}
					}
				}
				resChan <- waitResult{enrollmentToken: token}
				break
			}
			if unit.Message != "" {
				appMsg := fmt.Sprintf("Fleet Server - %s", unit.Message)
				if msg != appMsg {
					msg = appMsg
					msgCount = 0
					log.Info(appMsg)
				} else {
					msgCount++
					if msgCount > 5 {
						msgCount = 0
						log.Info(appMsg)
					}
				}
			}
		}
	}()

	var res waitResult
	if agentSubproc == nil {
		select {
		case <-ctx.Done():
			innerCancel()
			res = <-resChan
		case res = <-resChan:
		}
	} else {
		select {
		case ps := <-agentSubproc:
			res = waitResult{err: fmt.Errorf("spawned Elastic Agent exited unexpectedly: %s", ps)}
		case <-ctx.Done():
			innerCancel()
			res = <-resChan
		case res = <-resChan:
		}
	}

	if res.err != nil {
		return "", res.err
	}
	return res.enrollmentToken, nil
}