in agentendpoint/agentendpoint.go [365:431]
func (c *Client) WaitForTaskNotification(ctx context.Context) {
c.mx.Lock()
defer c.mx.Unlock()
if c.cancel != nil {
// WaitForTaskNotification is already running on this client.
return
}
clog.Debugf(ctx, "Running WaitForTaskNotification")
ctx, c.cancel = context.WithCancel(ctx)
clog.Debugf(ctx, "Checking local state file for saved task.")
if err := c.loadTaskFromState(ctx); err != nil {
clog.Errorf(ctx, "%v", err.Error())
}
clog.Debugf(ctx, "Setting up ReceiveTaskNotification stream watcher.")
go func() {
var resourceExhausted int
var errs int
var sleep time.Duration
for {
select {
case <-ctx.Done():
// We have been canceled.
clog.Debugf(ctx, "Disabling WaitForTaskNotification")
return
default:
}
if err := c.waitForTask(ctx); err != nil {
if errors.Is(err, errServiceNotEnabled) {
// Service is disabled, close this client and return.
clog.Warningf(ctx, "OSConfig Service is disabled.")
c.Close()
return
}
var ndr *metadata.NotDefinedError
if errors.As(err, &ndr) {
// No service account setup for this instance, close this client and return.
clog.Warningf(ctx, "No service account set for instance.")
c.Close()
return
}
if errors.Is(err, errResourceExhausted) {
resourceExhausted++
sleep = retryutil.RetrySleep(resourceExhausted, 5)
} else {
// Retry any other errors with a modest backoff. Only retry up to 10
// times, at that point return, the client will be recreated during the next
// cycle.
errs++
clog.Warningf(ctx, "Error waiting for task (attempt %d of 10): %v", errs, err)
resourceExhausted = 0
if errs >= 10 {
c.Close()
return
}
sleep = retryutil.RetrySleep(errs, 0)
}
time.Sleep(sleep)
continue
}
errs = 0
}
}()
}