func()

in pkg/client/client.go [186:314]


func (c *client) checkinRoundTrip() {
	checkinCtx, checkinCancel := context.WithCancel(c.ctx)
	defer checkinCancel()

	checkinClient, err := c.client.Checkin(checkinCtx)
	if err != nil {
		c.impl.OnError(err)
		return
	}

	var checkinWG sync.WaitGroup
	done := make(chan bool)

	// expected state check-ins
	checkinWG.Add(1)
	go func() {
		defer checkinWG.Done()
		for {
			expected, err := checkinClient.Recv()
			if err != nil {
				if !errors.Is(err, io.EOF) {
					c.impl.OnError(err)
				}
				close(done)
				return
			}

			if c.expected == proto.StateExpected_STOPPING {
				// in stopping state, do nothing with any other expected states
				continue
			}
			if expected.State == proto.StateExpected_STOPPING {
				// Elastic Agent is requesting us to stop.
				c.expected = expected.State
				c.impl.OnStop()
				continue
			}
			if expected.ConfigStateIdx != c.cfgIdx {
				// Elastic Agent is requesting us to update config.
				c.cfgMu.Lock()
				c.cfgIdx = expected.ConfigStateIdx
				c.cfg = expected.Config
				c.cfgMu.Unlock()
				c.impl.OnConfig(expected.Config)
				continue
			}
		}
	}()

	// observed state check-ins
	checkinWG.Add(1)
	go func() {
		defer checkinWG.Done()

		var lastSent time.Time
		var lastSentCfgIdx uint64
		var lastSentStatus proto.StateObserved_Status
		var lastSentMessage string
		var lastSentPayload string
		for {
			t := time.NewTimer(500 * time.Millisecond)
			select {
			case <-done:
				t.Stop()
				return
			case <-t.C:
			}

			c.cfgMu.RLock()
			cfgIdx := c.cfgIdx
			c.cfgMu.RUnlock()

			c.obsMu.RLock()
			observed := c.observed
			observedMsg := c.observedMessage
			observedPayload := c.observedPayload
			c.obsMu.RUnlock()

			sendMessage := func() error {
				err := checkinClient.Send(&proto.StateObserved{
					Token:          c.token,
					ConfigStateIdx: cfgIdx,
					Status:         observed,
					Message:        observedMsg,
					Payload:        observedPayload,
				})
				if err != nil {
					c.impl.OnError(err)
					return err
				}
				lastSent = time.Now()
				lastSentCfgIdx = cfgIdx
				lastSentStatus = observed
				lastSentMessage = observedMsg
				lastSentPayload = observedPayload
				return nil
			}

			// On start keep trying to send the initial check-in.
			if lastSent.IsZero() {
				if sendMessage() != nil {
					return
				}
				continue
			}

			// Send new status when it has changed.
			if lastSentCfgIdx != cfgIdx || lastSentStatus != observed || lastSentMessage != observedMsg || lastSentPayload != observedPayload {
				if sendMessage() != nil {
					return
				}
				continue
			}

			// Send when more than 30 seconds has passed without any status change.
			if time.Since(lastSent) >= c.minCheckTimeout {
				if sendMessage() != nil {
					return
				}
				continue
			}
		}
	}()

	// wait for both send and recv go routines to stop before
	// starting a new stream.
	checkinWG.Wait()
	checkinClient.CloseSend()
}