in pkg/client/client.go [186:314]
func (c *client) checkinRoundTrip() {
checkinCtx, checkinCancel := context.WithCancel(c.ctx)
defer checkinCancel()
checkinClient, err := c.client.Checkin(checkinCtx)
if err != nil {
c.impl.OnError(err)
return
}
var checkinWG sync.WaitGroup
done := make(chan bool)
// expected state check-ins
checkinWG.Add(1)
go func() {
defer checkinWG.Done()
for {
expected, err := checkinClient.Recv()
if err != nil {
if !errors.Is(err, io.EOF) {
c.impl.OnError(err)
}
close(done)
return
}
if c.expected == proto.StateExpected_STOPPING {
// in stopping state, do nothing with any other expected states
continue
}
if expected.State == proto.StateExpected_STOPPING {
// Elastic Agent is requesting us to stop.
c.expected = expected.State
c.impl.OnStop()
continue
}
if expected.ConfigStateIdx != c.cfgIdx {
// Elastic Agent is requesting us to update config.
c.cfgMu.Lock()
c.cfgIdx = expected.ConfigStateIdx
c.cfg = expected.Config
c.cfgMu.Unlock()
c.impl.OnConfig(expected.Config)
continue
}
}
}()
// observed state check-ins
checkinWG.Add(1)
go func() {
defer checkinWG.Done()
var lastSent time.Time
var lastSentCfgIdx uint64
var lastSentStatus proto.StateObserved_Status
var lastSentMessage string
var lastSentPayload string
for {
t := time.NewTimer(500 * time.Millisecond)
select {
case <-done:
t.Stop()
return
case <-t.C:
}
c.cfgMu.RLock()
cfgIdx := c.cfgIdx
c.cfgMu.RUnlock()
c.obsMu.RLock()
observed := c.observed
observedMsg := c.observedMessage
observedPayload := c.observedPayload
c.obsMu.RUnlock()
sendMessage := func() error {
err := checkinClient.Send(&proto.StateObserved{
Token: c.token,
ConfigStateIdx: cfgIdx,
Status: observed,
Message: observedMsg,
Payload: observedPayload,
})
if err != nil {
c.impl.OnError(err)
return err
}
lastSent = time.Now()
lastSentCfgIdx = cfgIdx
lastSentStatus = observed
lastSentMessage = observedMsg
lastSentPayload = observedPayload
return nil
}
// On start keep trying to send the initial check-in.
if lastSent.IsZero() {
if sendMessage() != nil {
return
}
continue
}
// Send new status when it has changed.
if lastSentCfgIdx != cfgIdx || lastSentStatus != observed || lastSentMessage != observedMsg || lastSentPayload != observedPayload {
if sendMessage() != nil {
return
}
continue
}
// Send when more than 30 seconds has passed without any status change.
if time.Since(lastSent) >= c.minCheckTimeout {
if sendMessage() != nil {
return
}
continue
}
}
}()
// wait for both send and recv go routines to stop before
// starting a new stream.
checkinWG.Wait()
checkinClient.CloseSend()
}