in docker/watcher.go [251:327]
func (w *watcher) watch() {
defer w.stopped.Done()
filter := filters.NewArgs()
filter.Add("type", "container")
// Ticker to restart the watcher when no events are received after some time.
tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
defer tickChan.Stop()
lastValidTimestamp := w.clock.Now()
watch := func() bool {
lastReceivedEventTime := w.clock.Now()
w.log.Debugf("Fetching events since %s", lastValidTimestamp)
options := events.ListOptions{
Since: lastValidTimestamp.Format(time.RFC3339Nano),
Filters: filter,
}
ctx, cancel := context.WithCancel(w.ctx)
defer cancel()
events, errs := w.client.Events(ctx, options)
for {
select {
case event := <-events:
w.log.Debugf("Got a new docker event: %v", event)
if event.TimeNano > 0 {
lastValidTimestamp = time.Unix(0, event.TimeNano)
} else {
lastValidTimestamp = time.Unix(event.Time, 0)
}
lastReceivedEventTime = w.clock.Now()
switch event.Action {
case "start", "update":
w.containerUpdate(event)
case "die":
w.containerDelete(event)
}
case err := <-errs:
if errors.Is(err, io.EOF) {
// Client disconnected, watch is not done, reconnect
w.log.Debug("EOF received in events stream, restarting watch call")
} else if errors.Is(err, context.DeadlineExceeded) {
w.log.Debug("Context deadline exceeded for docker request, restarting watch call")
} else if errors.Is(err, context.Canceled) {
// Parent context has been canceled, watch is done.
return true
} else {
w.log.Errorf("Error watching for docker events: %+v", err)
}
return false
case <-tickChan.C:
if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
return false
}
case <-w.ctx.Done():
w.log.Debug("Watcher stopped")
return true
}
}
}
for {
done := watch()
if done {
return
}
// Wait before trying to reconnect
time.Sleep(1 * time.Second)
}
}