docker/watcher.go (355 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux || darwin || windows
// +build linux darwin windows
package docker
import (
"context"
"errors"
"io"
"net/http"
"sync"
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/go-connections/tlsconfig"
"github.com/elastic/elastic-agent-autodiscover/bus"
"github.com/elastic/elastic-agent-libs/logp"
)
// Select Docker API version
const (
shortIDLen = 12
dockerRequestTimeout = 10 * time.Second
dockerEventsWatchPityTimerInterval = 10 * time.Second
dockerEventsWatchPityTimerTimeout = 10 * time.Minute
)
// Watcher reads docker events and keeps a list of known containers
type Watcher interface {
// Start watching docker API for new containers
Start() error
// Stop watching docker API for new containers
Stop()
// Container returns the running container with the given ID or nil if unknown
Container(ID string) *Container
// Containers returns the list of known containers
Containers() map[string]*Container
// ListenStart returns a bus listener to receive container started events, with a `container` key holding it
ListenStart() bus.Listener
// ListenStop returns a bus listener to receive container stopped events, with a `container` key holding it
ListenStop() bus.Listener
}
// TLSConfig for docker socket connection
type TLSConfig struct {
CA string `config:"certificate_authority"`
Certificate string `config:"certificate"`
Key string `config:"key"`
}
type watcher struct {
sync.RWMutex
log *logp.Logger
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
clock clock
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
}
// clock is an interface used to provide mocked time on testing
type clock interface {
Now() time.Time
}
// systemClock implements the clock interface using the system clock via the time package
type systemClock struct{}
// Now returns the current time
func (*systemClock) Now() time.Time { return time.Now() }
// Container info retrieved by the watcher
type Container struct {
ID string
Name string
Image string
Labels map[string]string
IPAddresses []string
Ports []container.Port
}
// Client for docker interface
type Client interface {
ContainerList(ctx context.Context, options container.ListOptions) ([]container.Summary, error)
ContainerInspect(ctx context.Context, container string) (container.InspectResponse, error)
Events(ctx context.Context, options events.ListOptions) (<-chan events.Message, <-chan error)
}
// WatcherConstructor represent a function that creates a new Watcher from giving parameters
type WatcherConstructor func(logp *logp.Logger, host string, tls *TLSConfig, storeShortID bool) (Watcher, error)
// NewWatcher returns a watcher running for the given settings
func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool) (Watcher, error) {
var httpClient *http.Client
if tls != nil {
options := tlsconfig.Options{
CAFile: tls.CA,
CertFile: tls.Certificate,
KeyFile: tls.Key,
}
tlsc, err := tlsconfig.Client(options)
if err != nil {
return nil, err
}
httpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsc,
},
}
}
client, err := NewClient(host, httpClient, nil)
if err != nil {
return nil, err
}
// Extra check to confirm that Docker is available
_, err = client.Info(context.Background())
if err != nil {
client.Close()
return nil, err
}
return NewWatcherWithClient(log, client, 60*time.Second, storeShortID)
}
// NewWatcherWithClient creates a new Watcher from a given Docker client
func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
return &watcher{
log: log,
client: client,
ctx: ctx,
stop: cancel,
containers: make(map[string]*Container),
deleted: make(map[string]time.Time),
cleanupTimeout: cleanupTimeout,
bus: bus.New(log, "docker"),
shortID: storeShortID,
clock: &systemClock{},
}, nil
}
// Container returns the running container with the given ID or nil if unknown
func (w *watcher) Container(ID string) *Container {
w.RLock()
container := w.containers[ID]
if container == nil {
w.RUnlock()
return nil
}
_, ok := w.deleted[container.ID]
w.RUnlock()
// Update last access time if it's deleted
if ok {
w.Lock()
w.deleted[container.ID] = w.clock.Now()
w.Unlock()
}
return container
}
// Containers returns the list of known containers
func (w *watcher) Containers() map[string]*Container {
w.RLock()
defer w.RUnlock()
res := make(map[string]*Container)
for k, v := range w.containers {
if !w.shortID || len(k) != shortIDLen {
res[k] = v
}
}
return res
}
// Start watching docker API for new containers
func (w *watcher) Start() error {
// Do initial scan of existing containers
w.log.Debug("Start docker containers scanner")
w.Lock()
defer w.Unlock()
containers, err := w.listContainers(container.ListOptions{})
if err != nil {
return err
}
for _, c := range containers {
w.containers[c.ID] = c
if w.shortID {
w.containers[c.ID[:shortIDLen]] = c
}
}
// Emit all start events (avoid blocking if the bus get's blocked)
go func() {
for _, c := range containers {
w.bus.Publish(bus.Event{
"start": true,
"container": c,
})
}
}()
w.stopped.Add(2)
go w.watch()
go w.cleanupWorker()
return nil
}
func (w *watcher) Stop() {
w.stop()
w.stopped.Wait()
}
func (w *watcher) watch() {
defer w.stopped.Done()
filter := filters.NewArgs()
filter.Add("type", "container")
// Ticker to restart the watcher when no events are received after some time.
tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
defer tickChan.Stop()
lastValidTimestamp := w.clock.Now()
watch := func() bool {
lastReceivedEventTime := w.clock.Now()
w.log.Debugf("Fetching events since %s", lastValidTimestamp)
options := events.ListOptions{
Since: lastValidTimestamp.Format(time.RFC3339Nano),
Filters: filter,
}
ctx, cancel := context.WithCancel(w.ctx)
defer cancel()
events, errs := w.client.Events(ctx, options)
for {
select {
case event := <-events:
w.log.Debugf("Got a new docker event: %v", event)
if event.TimeNano > 0 {
lastValidTimestamp = time.Unix(0, event.TimeNano)
} else {
lastValidTimestamp = time.Unix(event.Time, 0)
}
lastReceivedEventTime = w.clock.Now()
switch event.Action {
case "start", "update":
w.containerUpdate(event)
case "die":
w.containerDelete(event)
}
case err := <-errs:
if errors.Is(err, io.EOF) {
// Client disconnected, watch is not done, reconnect
w.log.Debug("EOF received in events stream, restarting watch call")
} else if errors.Is(err, context.DeadlineExceeded) {
w.log.Debug("Context deadline exceeded for docker request, restarting watch call")
} else if errors.Is(err, context.Canceled) {
// Parent context has been canceled, watch is done.
return true
} else {
w.log.Errorf("Error watching for docker events: %+v", err)
}
return false
case <-tickChan.C:
if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
return false
}
case <-w.ctx.Done():
w.log.Debug("Watcher stopped")
return true
}
}
}
for {
done := watch()
if done {
return
}
// Wait before trying to reconnect
time.Sleep(1 * time.Second)
}
}
func (w *watcher) containerUpdate(event events.Message) {
filter := filters.NewArgs()
filter.Add("id", event.Actor.ID)
containers, err := w.listContainers(container.ListOptions{
Filters: filter,
})
if err != nil || len(containers) != 1 {
w.log.Errorf("Error getting container info: %v", err)
return
}
container := containers[0]
w.Lock()
w.containers[event.Actor.ID] = container
if w.shortID {
w.containers[event.Actor.ID[:shortIDLen]] = container
}
// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()
w.bus.Publish(bus.Event{
"start": true,
"container": container,
})
}
func (w *watcher) containerDelete(event events.Message) {
container := w.Container(event.Actor.ID)
w.Lock()
w.deleted[event.Actor.ID] = w.clock.Now()
w.Unlock()
if container != nil {
w.bus.Publish(bus.Event{
"stop": true,
"container": container,
})
}
}
func (w *watcher) listContainers(options container.ListOptions) ([]*Container, error) {
log := w.log
log.Debug("List containers")
ctx, cancel := context.WithTimeout(w.ctx, dockerRequestTimeout)
defer cancel()
containers, err := w.client.ContainerList(ctx, options)
if err != nil {
return nil, err
}
result := make([]*Container, len(containers))
for idx, c := range containers {
var ipaddresses []string
if c.NetworkSettings != nil {
// Handle alternate platforms like VMWare's VIC that might not have this data.
for _, net := range c.NetworkSettings.Networks {
if net.IPAddress != "" {
ipaddresses = append(ipaddresses, net.IPAddress)
}
}
}
// If there are no network interfaces, assume that the container is on host network
// Inspect the container directly and use the hostname as the IP address in order
if len(ipaddresses) == 0 {
log.Debugf("Inspect container %s", c.ID)
ctx, cancel := context.WithTimeout(w.ctx, dockerRequestTimeout)
defer cancel()
info, err := w.client.ContainerInspect(ctx, c.ID)
if err == nil {
ipaddresses = append(ipaddresses, info.Config.Hostname)
} else {
log.Warnf("unable to inspect container %s due to error %+v", c.ID, err)
}
}
result[idx] = &Container{
ID: c.ID,
Name: c.Names[0][1:], // Strip '/' from container names
Image: c.Image,
Labels: c.Labels,
Ports: c.Ports,
IPAddresses: ipaddresses,
}
}
return result, nil
}
// Clean up deleted containers after they are not used anymore
func (w *watcher) cleanupWorker() {
defer w.stopped.Done()
for {
select {
case <-w.ctx.Done():
return
// Wait a full period
case <-time.After(w.cleanupTimeout):
w.runCleanup()
}
}
}
func (w *watcher) runCleanup() {
// Check entries for timeout
var toDelete []string
timeout := w.clock.Now().Add(-w.cleanupTimeout)
w.RLock()
for key, lastSeen := range w.deleted {
if lastSeen.Before(timeout) {
w.log.Debugf("Removing container %s after cool down timeout", key)
toDelete = append(toDelete, key)
}
}
w.RUnlock()
// Delete timed out entries:
for _, key := range toDelete {
container := w.Container(key)
if container != nil {
w.bus.Publish(bus.Event{
"delete": true,
"container": container,
})
}
}
w.Lock()
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
if w.shortID {
delete(w.containers, key[:shortIDLen])
}
}
w.Unlock()
}
// ListenStart returns a bus listener to receive container started events, with a `container` key holding it
func (w *watcher) ListenStart() bus.Listener {
return w.bus.Subscribe("start")
}
// ListenStop returns a bus listener to receive container stopped events, with a `container` key holding it
func (w *watcher) ListenStop() bus.Listener {
return w.bus.Subscribe("stop")
}