func()

in docker/watcher.go [251:327]


func (w *watcher) watch() {
	defer w.stopped.Done()

	filter := filters.NewArgs()
	filter.Add("type", "container")

	// Ticker to restart the watcher when no events are received after some time.
	tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
	defer tickChan.Stop()

	lastValidTimestamp := w.clock.Now()

	watch := func() bool {
		lastReceivedEventTime := w.clock.Now()

		w.log.Debugf("Fetching events since %s", lastValidTimestamp)

		options := events.ListOptions{
			Since:   lastValidTimestamp.Format(time.RFC3339Nano),
			Filters: filter,
		}

		ctx, cancel := context.WithCancel(w.ctx)
		defer cancel()

		events, errs := w.client.Events(ctx, options)
		for {
			select {
			case event := <-events:
				w.log.Debugf("Got a new docker event: %v", event)
				if event.TimeNano > 0 {
					lastValidTimestamp = time.Unix(0, event.TimeNano)
				} else {
					lastValidTimestamp = time.Unix(event.Time, 0)
				}
				lastReceivedEventTime = w.clock.Now()

				switch event.Action {
				case "start", "update":
					w.containerUpdate(event)
				case "die":
					w.containerDelete(event)
				}
			case err := <-errs:
				if errors.Is(err, io.EOF) {
					// Client disconnected, watch is not done, reconnect
					w.log.Debug("EOF received in events stream, restarting watch call")
				} else if errors.Is(err, context.DeadlineExceeded) {
					w.log.Debug("Context deadline exceeded for docker request, restarting watch call")
				} else if errors.Is(err, context.Canceled) {
					// Parent context has been canceled, watch is done.
					return true
				} else {
					w.log.Errorf("Error watching for docker events: %+v", err)
				}
				return false
			case <-tickChan.C:
				if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
					w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
					return false
				}
			case <-w.ctx.Done():
				w.log.Debug("Watcher stopped")
				return true
			}
		}
	}

	for {
		done := watch()
		if done {
			return
		}
		// Wait before trying to reconnect
		time.Sleep(1 * time.Second)
	}
}