in internal/pkg/agent/application/upgrade/watcher.go [70:247]
func (ch *AgentWatcher) Run(ctx context.Context) {
ch.log.Info("Agent watcher started")
ch.connectCounter = 0
ch.lostCounter = 0
// tracking of an error runs in a separate goroutine, because
// the call to `watch.Recv` blocks and a timer is needed
// to determine if an error last longer than the checkInterval.
failedReset := make(chan bool)
failedCh := make(chan error)
go func() {
failedTimer := time.NewTimer(ch.checkInterval)
failedTimer.Stop() // starts stopped
defer failedTimer.Stop() // stopped on exit always
var flipFlopCount int
var failedErr error
for {
select {
case <-ctx.Done():
return
case reset := <-failedReset:
if reset {
flipFlopCount = 0
failedTimer.Stop()
}
case err := <-failedCh:
if err != nil {
if failedErr == nil {
flipFlopCount++
failedTimer.Reset(ch.checkInterval)
ch.log.Errorf("Agent reported failure (starting failed timer): %s", err)
} else {
ch.log.Errorf("Agent reported failure (failed timer already started): %s", err)
}
} else {
if failedErr != nil {
failedTimer.Stop()
ch.log.Info("Agent reported healthy (failed timer stopped)")
}
}
failedErr = err
if flipFlopCount > statusFailureFlipFlopsAllowed {
err := fmt.Errorf("%w '%d' times in a row", ErrAgentFlipFlopFailed, flipFlopCount)
ch.log.Error(err)
ch.notifyChan <- err
}
case <-failedTimer.C:
if failedErr == nil {
// error was cleared; do nothing
continue
}
// error lasted longer than the checkInterval, notify!
ch.notifyChan <- fmt.Errorf("last error was not cleared before checkInterval (%s) elapsed: %w",
ch.checkInterval, failedErr)
}
}
}()
LOOP:
for {
ch.lastPid = -1
connectTimer := time.NewTimer(ch.checkInterval)
select {
case <-ctx.Done():
connectTimer.Stop()
return
case <-connectTimer.C:
ch.log.Info("Trying to connect to agent")
// block on connection, don't retry connection, and fail on temp dial errors
// always a local connection it should connect quickly so the timeout is only 1 second
connectCtx, connectCancel := context.WithTimeout(ctx, 1*time.Second)
//nolint:staticcheck // requires changing client signature
err := ch.agentClient.Connect(connectCtx, grpc.WithBlock(), grpc.WithDisableRetry(), grpc.FailOnNonTempDialError(true))
connectCancel()
if err != nil {
ch.connectCounter++
ch.log.Errorf("Failed connecting to running daemon: %s", err)
if ch.checkFailures() {
return
}
// agent is probably not running
continue
}
stateCtx, stateCancel := context.WithCancel(ctx)
watch, err := ch.agentClient.StateWatch(stateCtx)
if err != nil {
// considered a connect error
stateCancel()
ch.agentClient.Disconnect()
ch.log.Errorf("Failed to start state watch: %s", err)
ch.connectCounter++
if ch.checkFailures() {
return
}
// agent is probably not running
continue
}
ch.log.Info("Connected to agent")
// clear the connectCounter as connection was successfully made
// we don't want a disconnect and a reconnect to be counted with
// the connectCounter that is tracked with the lostCounter
ch.connectCounter = 0
// failure is tracked only for the life of the connection to
// the watch streaming protocol. either an error that last longer
// than the checkInterval or to many flopping of error/non-error
// will trigger a reported failure
failedReset <- true
failedCh <- nil
for {
state, err := watch.Recv()
if err != nil {
ch.log.Debugf("received state: error: %s",
err)
// agent has crashed or exited
stateCancel()
ch.agentClient.Disconnect()
ch.log.Errorf("Lost connection: failed reading next state: %s", err)
ch.lostCounter++
if ch.checkFailures() {
return
}
continue LOOP
}
ch.log.Debugf("received state: %s:%s",
state.State, state.Message)
// gRPC is good at hiding the fact that connection was lost
// to ensure that we don't miss a restart a changed PID means
// we are now talking to a different spawned Elastic Agent
if ch.lastPid == -1 {
ch.lastPid = state.Info.PID
ch.log.Infof("Communicating with PID %d", ch.lastPid)
} else if ch.lastPid != state.Info.PID {
ch.log.Errorf("Communication with PID %d lost, now communicating with PID %d", ch.lastPid, state.Info.PID)
ch.lastPid = state.Info.PID
// count the PID change as a lost connection, but allow
// the communication to continue unless has become a failure
ch.lostCounter++
if ch.checkFailures() {
stateCancel()
ch.agentClient.Disconnect()
return
}
}
if state.State == client.Failed {
// top-level failure (something is really wrong)
failedCh <- fmt.Errorf("%w: %s", ErrAgentStatusFailed, state.Message)
continue
} else {
// agent is healthy; but a component might not be healthy
// upgrade tracks unhealthy component as an issue with the upgrade
var errs []error
for _, comp := range state.Components {
if comp.State == client.Failed {
errs = append(errs, fmt.Errorf("component %s[%v] failed: %s", comp.Name, comp.ID, comp.Message))
}
}
if len(errs) != 0 {
failedCh <- fmt.Errorf("%w: %w", ErrAgentComponentFailed, errors.Join(errs...))
continue
}
}
// nothing is failed
failedCh <- nil
}
}
}
}