func()

in agent/dockerclient/dockerapi/docker_client.go [1398:1497]


func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeout time.Duration) (<-chan *types.StatsJSON, <-chan error) {
	subCtx, cancelRequest := context.WithCancel(ctx)

	errC := make(chan error, 1)
	statsC := make(chan *types.StatsJSON)
	client, err := dg.sdkDockerClient()
	if err != nil {
		cancelRequest()
		go func() {
			// upstream function should consume error
			errC <- err
			close(statsC)
		}()
		return statsC, errC
	}

	var resp types.ContainerStats
	if !dg.config.PollMetrics.Enabled() {
		// Streaming metrics is the default behavior
		seelog.Infof("DockerGoClient: Starting streaming metrics for container %s", id)
		go func() {
			defer cancelRequest()
			defer close(statsC)
			stream := true
			resp, err = client.ContainerStats(subCtx, id, stream)
			if err != nil {
				errC <- fmt.Errorf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err)
				return
			}

			// handle inactivity timeout
			var canceled uint32
			var ch chan<- struct{}
			resp.Body, ch = dg.inactivityTimeoutHandler(resp.Body, inactivityTimeout, cancelRequest, &canceled)
			defer resp.Body.Close()
			defer close(ch)

			decoder := json.NewDecoder(resp.Body)
			data := new(types.StatsJSON)
			for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) {
				if err != nil {
					errC <- fmt.Errorf("DockerGoClient: Unable to decode stats for container %s: %v", id, err)
					return
				}
				if atomic.LoadUint32(&canceled) != 0 {
					errC <- fmt.Errorf("DockerGoClient: inactivity time exceeded timeout while retrieving stats for container %s", id)
					return
				}

				select {
				case <-ctx.Done():
					return
				case statsC <- data:
				}

				data = new(types.StatsJSON)
			}
		}()
	} else {
		seelog.Infof("DockerGoClient: Starting to Poll for metrics for container %s", id)

		go func() {
			defer cancelRequest()
			defer close(statsC)
			// we need to start by getting container stats so that the task stats
			// endpoint will be populated immediately.
			stats, err := getContainerStatsNotStreamed(client, subCtx, id, pollStatsTimeout)
			if err != nil {
				errC <- err
				return
			}
			select {
			case <-ctx.Done():
				return
			case statsC <- stats:
			}
			// sleeping here jitters the time at which the ticker is created, so that
			// containers do not synchronize on calling the docker stats api.
			// the max sleep is 80% of the polling interval so that we have a chance to
			// get two stats in the first publishing interval.
			time.Sleep(retry.AddJitter(time.Nanosecond, dg.config.PollingMetricsWaitDuration*8/10))
			statPollTicker := time.NewTicker(dg.config.PollingMetricsWaitDuration)
			defer statPollTicker.Stop()
			for range statPollTicker.C {
				stats, err := getContainerStatsNotStreamed(client, subCtx, id, pollStatsTimeout)
				if err != nil {
					errC <- err
					return
				}
				select {
				case <-ctx.Done():
					return
				case statsC <- stats:
				}
			}
		}()
	}

	return statsC, errC
}