in agent/dockerclient/dockerapi/docker_client.go [1398:1497]
func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeout time.Duration) (<-chan *types.StatsJSON, <-chan error) {
subCtx, cancelRequest := context.WithCancel(ctx)
errC := make(chan error, 1)
statsC := make(chan *types.StatsJSON)
client, err := dg.sdkDockerClient()
if err != nil {
cancelRequest()
go func() {
// upstream function should consume error
errC <- err
close(statsC)
}()
return statsC, errC
}
var resp types.ContainerStats
if !dg.config.PollMetrics.Enabled() {
// Streaming metrics is the default behavior
seelog.Infof("DockerGoClient: Starting streaming metrics for container %s", id)
go func() {
defer cancelRequest()
defer close(statsC)
stream := true
resp, err = client.ContainerStats(subCtx, id, stream)
if err != nil {
errC <- fmt.Errorf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err)
return
}
// handle inactivity timeout
var canceled uint32
var ch chan<- struct{}
resp.Body, ch = dg.inactivityTimeoutHandler(resp.Body, inactivityTimeout, cancelRequest, &canceled)
defer resp.Body.Close()
defer close(ch)
decoder := json.NewDecoder(resp.Body)
data := new(types.StatsJSON)
for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) {
if err != nil {
errC <- fmt.Errorf("DockerGoClient: Unable to decode stats for container %s: %v", id, err)
return
}
if atomic.LoadUint32(&canceled) != 0 {
errC <- fmt.Errorf("DockerGoClient: inactivity time exceeded timeout while retrieving stats for container %s", id)
return
}
select {
case <-ctx.Done():
return
case statsC <- data:
}
data = new(types.StatsJSON)
}
}()
} else {
seelog.Infof("DockerGoClient: Starting to Poll for metrics for container %s", id)
go func() {
defer cancelRequest()
defer close(statsC)
// we need to start by getting container stats so that the task stats
// endpoint will be populated immediately.
stats, err := getContainerStatsNotStreamed(client, subCtx, id, pollStatsTimeout)
if err != nil {
errC <- err
return
}
select {
case <-ctx.Done():
return
case statsC <- stats:
}
// sleeping here jitters the time at which the ticker is created, so that
// containers do not synchronize on calling the docker stats api.
// the max sleep is 80% of the polling interval so that we have a chance to
// get two stats in the first publishing interval.
time.Sleep(retry.AddJitter(time.Nanosecond, dg.config.PollingMetricsWaitDuration*8/10))
statPollTicker := time.NewTicker(dg.config.PollingMetricsWaitDuration)
defer statPollTicker.Stop()
for range statPollTicker.C {
stats, err := getContainerStatsNotStreamed(client, subCtx, id, pollStatsTimeout)
if err != nil {
errC <- err
return
}
select {
case <-ctx.Done():
return
case statsC <- stats:
}
}
}()
}
return statsC, errC
}