in internal/pkg/agent/cmd/enroll_cmd.go [747:874]
func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessState, log *logger.Logger, timeout time.Duration) (string, error) {
if timeout == 0 {
timeout = 2 * time.Minute
}
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
maxBackoff := timeout
if maxBackoff <= 0 {
// indefinite timeout
maxBackoff = 10 * time.Minute
}
resChan := make(chan waitResult)
innerCtx, innerCancel := context.WithCancel(context.Background())
defer innerCancel()
go func() {
msg := ""
msgCount := 0
backExp := expBackoffWithContext(innerCtx, 1*time.Second, maxBackoff)
for {
// if the timeout is reached, no response was sent on `res`, therefore
// send an error
if !backExp.Wait() {
resChan <- waitResult{err: fmt.Errorf(
"timed out waiting for Fleet Server to start after %s",
timeout)}
}
state, err := getDaemonState(innerCtx)
if errors.Is(err, context.Canceled) {
resChan <- waitResult{err: err}
return
}
if err != nil {
log.Debugf("%s: %s", waitingForAgent, err)
if msg != waitingForAgent {
msg = waitingForAgent
msgCount = 0
log.Info(waitingForAgent)
} else {
msgCount++
if msgCount > 5 {
msgCount = 0
log.Infof("%s: %s", waitingForAgent, err)
}
}
continue
}
unit := getCompUnitFromStatus(state, "fleet-server")
if unit == nil {
err = errors.New("no fleet-server application running")
log.Debugf("%s: %s", waitingForFleetServer, err)
if msg != waitingForFleetServer {
msg = waitingForFleetServer
msgCount = 0
log.Info(waitingForFleetServer)
} else {
msgCount++
if msgCount > 5 {
msgCount = 0
log.Infof("%s: %s", waitingForFleetServer, err)
}
}
continue
}
log.Debugf("%s: %s - %s", waitingForFleetServer, unit.State, unit.Message)
if unit.State == client.Degraded || unit.State == client.Healthy {
// app has started and is running
if unit.Message != "" {
log.Infof("Fleet Server - %s", unit.Message)
}
// extract the enrollment token from the status payload
token := ""
if unit.Payload != nil {
if enrollToken, ok := unit.Payload["enrollment_token"]; ok {
if tokenStr, ok := enrollToken.(string); ok {
token = tokenStr
}
}
}
resChan <- waitResult{enrollmentToken: token}
break
}
if unit.Message != "" {
appMsg := fmt.Sprintf("Fleet Server - %s", unit.Message)
if msg != appMsg {
msg = appMsg
msgCount = 0
log.Info(appMsg)
} else {
msgCount++
if msgCount > 5 {
msgCount = 0
log.Info(appMsg)
}
}
}
}
}()
var res waitResult
if agentSubproc == nil {
select {
case <-ctx.Done():
innerCancel()
res = <-resChan
case res = <-resChan:
}
} else {
select {
case ps := <-agentSubproc:
res = waitResult{err: fmt.Errorf("spawned Elastic Agent exited unexpectedly: %s", ps)}
case <-ctx.Done():
innerCancel()
res = <-resChan
case res = <-resChan:
}
}
if res.err != nil {
return "", res.err
}
return res.enrollmentToken, nil
}