func()

in agentendpoint/agentendpoint.go [365:431]


func (c *Client) WaitForTaskNotification(ctx context.Context) {
	c.mx.Lock()
	defer c.mx.Unlock()
	if c.cancel != nil {
		// WaitForTaskNotification is already running on this client.
		return
	}
	clog.Debugf(ctx, "Running WaitForTaskNotification")
	ctx, c.cancel = context.WithCancel(ctx)

	clog.Debugf(ctx, "Checking local state file for saved task.")
	if err := c.loadTaskFromState(ctx); err != nil {
		clog.Errorf(ctx, "%v", err.Error())
	}

	clog.Debugf(ctx, "Setting up ReceiveTaskNotification stream watcher.")
	go func() {
		var resourceExhausted int
		var errs int
		var sleep time.Duration
		for {
			select {
			case <-ctx.Done():
				// We have been canceled.
				clog.Debugf(ctx, "Disabling WaitForTaskNotification")
				return
			default:
			}

			if err := c.waitForTask(ctx); err != nil {
				if errors.Is(err, errServiceNotEnabled) {
					// Service is disabled, close this client and return.
					clog.Warningf(ctx, "OSConfig Service is disabled.")
					c.Close()
					return
				}
				var ndr *metadata.NotDefinedError
				if errors.As(err, &ndr) {
					// No service account setup for this instance, close this client and return.
					clog.Warningf(ctx, "No service account set for instance.")
					c.Close()
					return
				}

				if errors.Is(err, errResourceExhausted) {
					resourceExhausted++
					sleep = retryutil.RetrySleep(resourceExhausted, 5)
				} else {
					// Retry any other errors with a modest backoff. Only retry up to 10
					// times, at that point return, the client will be recreated during the next
					// cycle.
					errs++
					clog.Warningf(ctx, "Error waiting for task (attempt %d of 10): %v", errs, err)
					resourceExhausted = 0
					if errs >= 10 {
						c.Close()
						return
					}
					sleep = retryutil.RetrySleep(errs, 0)
				}
				time.Sleep(sleep)
				continue
			}
			errs = 0
		}
	}()
}