agent/stats/queue.go (479 lines of code) (raw):
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package stats
import (
"fmt"
"math"
"sync"
"time"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
loggerfield "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
"github.com/aws/amazon-ecs-agent/ecs-agent/stats"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
"github.com/cihub/seelog"
"github.com/docker/docker/api/types"
"github.com/pkg/errors"
)
const (
// BytesInMiB is the number of bytes in a MebiByte.
BytesInMiB = 1024 * 1024
MaxCPUUsagePerc float32 = 1024 * 1024
NanoSecToSec float32 = 1000000000
)
// Queue abstracts a queue using UsageStats slice.
type Queue struct {
buffer []UsageStats
maxSize int
lastStat *types.StatsJSON
lastNetworkStatPerSec *stats.NetworkStatsPerSec
lock sync.RWMutex
}
// NewQueue creates a queue.
func NewQueue(maxSize int) *Queue {
return &Queue{
buffer: make([]UsageStats, 0, maxSize),
maxSize: maxSize,
}
}
// Reset resets the queue's buffer so that only new metrics added after
// this point will be sent to the backend when calling stat getter functions like
// GetCPUStatsSet, GetMemoryStatSet, etc.
func (queue *Queue) Reset() {
queue.lock.Lock()
defer queue.lock.Unlock()
for i := range queue.buffer {
queue.buffer[i].sent = true
}
}
// AddContainerStat adds a new set of stats for a container to the queue. This method is only intended for use while
// processing docker stats for a stats container.
func (queue *Queue) AddContainerStat(dockerStat *types.StatsJSON, nonDockerStats NonDockerContainerStats,
lastStatBeforeLastRestart *types.StatsJSON, containerHasRestartedBefore bool) error {
if containerHasRestartedBefore {
dockerStat = getAggregatedDockerStatAcrossRestarts(dockerStat, lastStatBeforeLastRestart, queue.GetLastStat())
}
return queue.Add(dockerStat, nonDockerStats)
}
// Add adds a new set of stats to the queue.
func (queue *Queue) Add(dockerStat *types.StatsJSON, nonDockerStats NonDockerContainerStats) error {
queue.setLastStat(dockerStat)
stat, err := dockerStatsToContainerStats(dockerStat)
if err != nil {
return err
}
if nonDockerStats.restartCount != nil {
stat.restartCount = nonDockerStats.restartCount
}
queue.add(stat)
return nil
}
func (queue *Queue) setLastStat(stat *types.StatsJSON) {
queue.lock.Lock()
defer queue.lock.Unlock()
queue.lastStat = stat
}
func (queue *Queue) add(rawStat *ContainerStats) {
queue.lock.Lock()
defer queue.lock.Unlock()
queueLength := len(queue.buffer)
stat := UsageStats{
CPUUsagePerc: float32(nan32()),
RestartCount: rawStat.restartCount,
MemoryUsageInMegs: uint32(rawStat.memoryUsage / BytesInMiB),
StorageReadBytes: rawStat.storageReadBytes,
StorageWriteBytes: rawStat.storageWriteBytes,
NetworkStats: rawStat.networkStats,
Timestamp: rawStat.timestamp,
cpuUsage: rawStat.cpuUsage,
sent: false,
}
if queueLength != 0 {
// % utilization can be calculated only when queue is non-empty.
lastStat := queue.buffer[queueLength-1]
timeSinceLastStat := float32(rawStat.timestamp.Sub(lastStat.Timestamp).Nanoseconds())
if timeSinceLastStat <= 0 {
// if we got a duplicate timestamp, set cpu percentage to the same value as the previous stat
seelog.Errorf("Received a docker stat object with duplicate timestamp")
stat.CPUUsagePerc = lastStat.CPUUsagePerc
if stat.NetworkStats != nil && lastStat.NetworkStats != nil {
stat.NetworkStats.RxBytesPerSecond = lastStat.NetworkStats.RxBytesPerSecond
stat.NetworkStats.TxBytesPerSecond = lastStat.NetworkStats.TxBytesPerSecond
}
} else {
cpuUsageSinceLastStat := float32(rawStat.cpuUsage - lastStat.cpuUsage)
stat.CPUUsagePerc = 100 * cpuUsageSinceLastStat / timeSinceLastStat
//calculate per second Network metrics
if stat.NetworkStats != nil && lastStat.NetworkStats != nil {
rxBytesSinceLastStat := float32(stat.NetworkStats.RxBytes - lastStat.NetworkStats.RxBytes)
txBytesSinceLastStat := float32(stat.NetworkStats.TxBytes - lastStat.NetworkStats.TxBytes)
stat.NetworkStats.RxBytesPerSecond = NanoSecToSec * (rxBytesSinceLastStat / timeSinceLastStat)
stat.NetworkStats.TxBytesPerSecond = NanoSecToSec * (txBytesSinceLastStat / timeSinceLastStat)
}
}
if queueLength >= queue.maxSize {
// Remove first element if queue is full.
queue.buffer = queue.buffer[1:queueLength]
}
if stat.CPUUsagePerc > MaxCPUUsagePerc {
// what in the world happened
seelog.Errorf("Calculated CPU usage percent (%.1f) is larger than backend maximum (%.1f). lastStatTS=%s lastStatCPUTime=%d thisStatTS=%s thisStatCPUTime=%d queueLength=%d",
stat.CPUUsagePerc, MaxCPUUsagePerc, lastStat.Timestamp.Format(time.RFC3339Nano), lastStat.cpuUsage, rawStat.timestamp.Format(time.RFC3339Nano), rawStat.cpuUsage, queueLength)
}
if stat.NetworkStats != nil {
networkStatPerSec := &stats.NetworkStatsPerSec{
RxBytesPerSecond: float64(stat.NetworkStats.RxBytesPerSecond),
TxBytesPerSecond: float64(stat.NetworkStats.TxBytesPerSecond),
}
queue.lastNetworkStatPerSec = networkStatPerSec
}
}
queue.buffer = append(queue.buffer, stat)
}
// GetLastStat returns the last recorded raw statistics object from docker
func (queue *Queue) GetLastStat() *types.StatsJSON {
queue.lock.RLock()
defer queue.lock.RUnlock()
return queue.lastStat
}
func (queue *Queue) GetLastNetworkStatPerSec() *stats.NetworkStatsPerSec {
queue.lock.RLock()
defer queue.lock.RUnlock()
return queue.lastNetworkStatPerSec
}
// GetCPUStatsSet gets the stats set for CPU utilization.
func (queue *Queue) GetCPUStatsSet() (*ecstcs.CWStatsSet, error) {
return queue.getCWStatsSet(getCPUUsagePerc)
}
// GetMemoryStatsSet gets the stats set for memory utilization.
func (queue *Queue) GetMemoryStatsSet() (*ecstcs.CWStatsSet, error) {
return queue.getCWStatsSet(getMemoryUsagePerc)
}
// GetStorageStatsSet gets the stats set for aggregate storage
func (queue *Queue) GetStorageStatsSet() (*ecstcs.StorageStatsSet, error) {
storageStatsSet := &ecstcs.StorageStatsSet{}
var err error
var errStr string
storageStatsSet.ReadSizeBytes, err = queue.getULongStatsSet(getStorageReadBytes)
if err != nil {
errStr += fmt.Sprintf("error getting storage read size bytes: %v - ", err)
}
storageStatsSet.WriteSizeBytes, err = queue.getULongStatsSet(getStorageWriteBytes)
if err != nil {
errStr += fmt.Sprintf("error getting storage write size bytes: %v - ", err)
}
var errOut error
if len(errStr) > 0 {
errOut = errors.New(errStr)
}
return storageStatsSet, errOut
}
// GetRestartStatsSet gets the stats set for container restarts
func (queue *Queue) GetRestartStatsSet() (*ecstcs.RestartStatsSet, error) {
return queue.getRestartStatsSet(getRestartCount)
}
func (queue *Queue) getRestartStatsSet(getInt getIntPointerFunc) (*ecstcs.RestartStatsSet, error) {
queue.lock.Lock()
defer queue.lock.Unlock()
var firstStat, lastStat int64
firstStat = -1
queueLength := len(queue.buffer)
if queueLength < 2 {
// Need at least 2 data points to calculate this.
return nil, fmt.Errorf("need at least 2 data points in queue to calculate restart stats set")
}
for i, stat := range queue.buffer {
if stat.sent {
// don't send stats to TACS if already sent
continue
}
thisStat := getInt(&stat)
if thisStat == nil {
continue
}
if i == queueLength-1 {
// get final unsent stat in the queue
lastStat = *thisStat
}
if firstStat == -1 {
// firstStat is unset so set it here
if i > 0 {
// some stats in queue were sent already, so use the stat previous to the
// first to diff with the last stat.
thisStat = getInt(&queue.buffer[i-1])
if thisStat == nil {
continue
}
}
firstStat = *thisStat
}
}
if firstStat == -1 {
// no non-nil stats found, most likely this means that the container
// does not have a restart policy set.
return nil, fmt.Errorf("No non-nil restart count stats found, not sending RestartStatsSet." +
" Most likely the container does not have a restart policy configured")
}
// get the diff in restart count between the first stat and the last stat in the
// queue. Examples:
// [ 0 1 3 3 4 5 ] = 5 - 0 = 5 restarts
// [ 0(sent) 1(sent) 3(sent) 4(unsent) 5(unsent) 7(unsent) ] = 7 - 3 = 4 restarts
result := lastStat - firstStat
if result < 0 {
return nil, fmt.Errorf("Negative restart count calculated, firstStat=%d lastStat=%d result=%d", firstStat, lastStat, result)
}
return &ecstcs.RestartStatsSet{
RestartCount: &result,
}, nil
}
// GetNetworkStatsSet gets the stats set for network metrics.
func (queue *Queue) GetNetworkStatsSet() (*ecstcs.NetworkStatsSet, error) {
networkStatsSet := &ecstcs.NetworkStatsSet{}
var err error
var errStr string
networkStatsSet.RxBytes, err = queue.getULongStatsSet(getNetworkRxBytes)
if err != nil {
errStr += fmt.Sprintf("error getting network rx bytes: %v - ", err)
}
networkStatsSet.RxDropped, err = queue.getULongStatsSet(getNetworkRxDropped)
if err != nil {
errStr += fmt.Sprintf("error getting network rx dropped: %v - ", err)
}
networkStatsSet.RxErrors, err = queue.getULongStatsSet(getNetworkRxErrors)
if err != nil {
errStr += fmt.Sprintf("error getting network rx errors: %v - ", err)
}
networkStatsSet.RxPackets, err = queue.getULongStatsSet(getNetworkRxPackets)
if err != nil {
errStr += fmt.Sprintf("error getting network rx packets: %v - ", err)
}
networkStatsSet.TxBytes, err = queue.getULongStatsSet(getNetworkTxBytes)
if err != nil {
errStr += fmt.Sprintf("error getting network tx bytes: %v - ", err)
}
networkStatsSet.TxDropped, err = queue.getULongStatsSet(getNetworkTxDropped)
if err != nil {
errStr += fmt.Sprintf("error getting network tx dropped: %v - ", err)
}
networkStatsSet.TxErrors, err = queue.getULongStatsSet(getNetworkTxErrors)
if err != nil {
errStr += fmt.Sprintf("error getting network tx errors: %v - ", err)
}
networkStatsSet.TxPackets, err = queue.getULongStatsSet(getNetworkTxPackets)
if err != nil {
errStr += fmt.Sprintf("error getting network tx packets: %v - ", err)
}
networkStatsSet.RxBytesPerSecond, err = queue.getUDoubleCWStatsSet(getNetworkRxPacketsPerSecond)
if err != nil {
errStr += fmt.Sprintf("error getting network rx bytes per second: %v - ", err)
}
networkStatsSet.TxBytesPerSecond, err = queue.getUDoubleCWStatsSet(getNetworkTxPacketsPerSecond)
if err != nil {
errStr += fmt.Sprintf("error getting network tx bytes per second: %v - ", err)
}
var errOut error
if len(errStr) > 0 {
errOut = errors.New(errStr)
}
return networkStatsSet, errOut
}
func getNetworkRxBytes(s *UsageStats) uint64 {
if s.NetworkStats != nil {
return s.NetworkStats.RxBytes
}
return uint64(0)
}
func getNetworkRxDropped(s *UsageStats) uint64 {
if s.NetworkStats != nil {
return s.NetworkStats.RxDropped
}
return uint64(0)
}
func getNetworkRxErrors(s *UsageStats) uint64 {
if s.NetworkStats != nil {
return s.NetworkStats.RxErrors
}
return uint64(0)
}
func getNetworkRxPackets(s *UsageStats) uint64 {
if s.NetworkStats != nil {
return s.NetworkStats.RxPackets
}
return uint64(0)
}
func getNetworkTxBytes(s *UsageStats) uint64 {
if s.NetworkStats != nil {
return s.NetworkStats.TxBytes
}
return uint64(0)
}
func getNetworkTxDropped(s *UsageStats) uint64 {
if s.NetworkStats != nil {
return s.NetworkStats.TxDropped
}
return uint64(0)
}
func getNetworkTxErrors(s *UsageStats) uint64 {
if s.NetworkStats != nil {
return s.NetworkStats.TxErrors
}
return uint64(0)
}
func getNetworkTxPackets(s *UsageStats) uint64 {
if s.NetworkStats != nil {
return s.NetworkStats.TxPackets
}
return uint64(0)
}
func getNetworkRxPacketsPerSecond(s *UsageStats) float64 {
if s.NetworkStats != nil {
return float64(s.NetworkStats.RxBytesPerSecond)
}
return float64(0)
}
func getNetworkTxPacketsPerSecond(s *UsageStats) float64 {
if s.NetworkStats != nil {
return float64(s.NetworkStats.TxBytesPerSecond)
}
return float64(0)
}
func getCPUUsagePerc(s *UsageStats) float64 {
return float64(s.CPUUsagePerc)
}
func getMemoryUsagePerc(s *UsageStats) float64 {
return float64(s.MemoryUsageInMegs)
}
func getStorageReadBytes(s *UsageStats) uint64 {
return s.StorageReadBytes
}
func getStorageWriteBytes(s *UsageStats) uint64 {
return s.StorageWriteBytes
}
func getRestartCount(s *UsageStats) *int64 {
return s.RestartCount
}
// getInt64WithOverflow truncates a uint64 to fit an int64
// it returns overflow as a second int64
func getInt64WithOverflow(uintStat uint64) (int64, int64) {
if uintStat > math.MaxInt64 {
overflow := int64(uintStat % uint64(math.MaxInt64))
return math.MaxInt64, overflow
}
return int64(uintStat), int64(0)
}
type getUsageFloatFunc func(*UsageStats) float64
type getUsageIntFunc func(*UsageStats) uint64
type getIntPointerFunc func(*UsageStats) *int64
// getCWStatsSet gets the stats set for either CPU or Memory based on the
// function pointer.
func (queue *Queue) getCWStatsSet(getUsageFloat getUsageFloatFunc) (*ecstcs.CWStatsSet, error) {
queue.lock.Lock()
defer queue.lock.Unlock()
queueLength := len(queue.buffer)
if queueLength < 2 {
// Need at least 2 data points to calculate this.
return nil, fmt.Errorf("need at least 2 data points in queue to calculate CW stats set")
}
var min, max, sum float64
var sampleCount int64
min = math.MaxFloat64
max = -math.MaxFloat64
sum = 0
sampleCount = 0
for _, stat := range queue.buffer {
if stat.sent {
// don't send stats to TACS if already sent
continue
}
thisStat := getUsageFloat(&stat)
if math.IsNaN(thisStat) {
continue
}
min = math.Min(min, thisStat)
max = math.Max(max, thisStat)
sampleCount++
sum += thisStat
}
// don't emit metrics when sampleCount == 0
if sampleCount == 0 {
return nil, fmt.Errorf("need at least 1 non-NaN data points in queue to calculate CW stats set")
}
return &ecstcs.CWStatsSet{
Max: &max,
Min: &min,
SampleCount: &sampleCount,
Sum: &sum,
}, nil
}
// getULongStatsSet gets the stats set for the specified raw stat type
// stats come from docker as uint64 type, and by neccesity are packed into int64 type
// where there is overflow (math.MaxInt64 + 1 or greater)
// we capture the excess in optional overflow fields.
func (queue *Queue) getULongStatsSet(getUsageInt getUsageIntFunc) (*ecstcs.ULongStatsSet, error) {
queue.lock.Lock()
defer queue.lock.Unlock()
queueLength := len(queue.buffer)
if queueLength < 2 {
// Need at least 2 data points to calculate this.
return nil, fmt.Errorf("need at least 2 data points in the queue to calculate int stats")
}
var min, max, sum uint64
var sampleCount int64
min = math.MaxUint64
max = 0
sum = 0
sampleCount = 0
for _, stat := range queue.buffer {
if stat.sent {
// don't send stats to TACS if already sent
continue
}
thisStat := getUsageInt(&stat)
if thisStat < min {
min = thisStat
}
if thisStat > max {
max = thisStat
}
sum += thisStat
sampleCount++
}
// don't emit metrics when sampleCount == 0
if sampleCount == 0 {
return nil, fmt.Errorf("need at least 1 non-NaN data points in queue to calculate CW stats set")
}
baseMin, overflowMin := getInt64WithOverflow(min)
baseMax, overflowMax := getInt64WithOverflow(max)
baseSum, overflowSum := getInt64WithOverflow(sum)
return &ecstcs.ULongStatsSet{
Max: &baseMax,
OverflowMax: &overflowMax,
Min: &baseMin,
OverflowMin: &overflowMin,
SampleCount: &sampleCount,
Sum: &baseSum,
OverflowSum: &overflowSum,
}, nil
}
// getUDoubleCWStatsSet gets the stats set for per second network metrics
func (queue *Queue) getUDoubleCWStatsSet(getUsageFloat getUsageFloatFunc) (*ecstcs.UDoubleCWStatsSet, error) {
queue.lock.Lock()
defer queue.lock.Unlock()
queueLength := len(queue.buffer)
if queueLength < 2 {
// Need at least 2 data points to calculate this.
return nil, fmt.Errorf("need at least 2 data points in queue to calculate CW stats set")
}
var min, max, sum float64
var sampleCount int64
min = math.MaxFloat64
max = -math.MaxFloat64
sum = 0
sampleCount = 0
for _, stat := range queue.buffer {
if stat.sent {
// don't send stats to TACS if already sent
continue
}
thisStat := getUsageFloat(&stat)
if math.IsNaN(thisStat) {
continue
}
min = math.Min(min, thisStat)
max = math.Max(max, thisStat)
sampleCount++
sum += thisStat
}
// don't emit metrics when sampleCount == 0
if sampleCount == 0 {
return nil, fmt.Errorf("need at least 1 non-NaN data points in queue to calculate CW stats set")
}
return &ecstcs.UDoubleCWStatsSet{
Max: &max,
Min: &min,
SampleCount: &sampleCount,
Sum: &sum,
}, nil
}
// getAggregatedDockerStatAcrossRestarts gets the aggregated docker stat for a container across container restarts.
func getAggregatedDockerStatAcrossRestarts(dockerStat, lastStatBeforeLastRestart,
lastStatInStatsQueue *types.StatsJSON) *types.StatsJSON {
dockerStat = aggregateOSIndependentStats(dockerStat, lastStatBeforeLastRestart)
dockerStat = aggregateOSDependentStats(dockerStat, lastStatBeforeLastRestart)
// Stats relevant to PreCPU.
preCPUStats := types.CPUStats{}
preRead := time.Time{}
if lastStatInStatsQueue != nil {
preCPUStats = lastStatInStatsQueue.CPUStats
preRead = lastStatInStatsQueue.Read
}
dockerStat.PreCPUStats = preCPUStats
dockerStat.PreRead = preRead
logger.Debug("Aggregated Docker stat across restart(s)", logger.Fields{
loggerfield.DockerId: dockerStat.ID,
})
return dockerStat
}
// aggregateOSIndependentStats aggregates stats that are measured cumulatively against container start time and
// populated regardless of what OS is being used.
func aggregateOSIndependentStats(dockerStat, lastStatBeforeLastRestart *types.StatsJSON) *types.StatsJSON {
// CPU stats.
dockerStat.CPUStats.CPUUsage.TotalUsage += lastStatBeforeLastRestart.CPUStats.CPUUsage.TotalUsage
dockerStat.CPUStats.CPUUsage.UsageInKernelmode += lastStatBeforeLastRestart.CPUStats.CPUUsage.UsageInKernelmode
dockerStat.CPUStats.CPUUsage.UsageInUsermode += lastStatBeforeLastRestart.CPUStats.CPUUsage.UsageInUsermode
// Network stats.
for key, dockerStatNetwork := range dockerStat.Networks {
lastStatBeforeLastRestartNetwork, ok := lastStatBeforeLastRestart.Networks[key]
if ok {
dockerStatNetwork.RxBytes += lastStatBeforeLastRestartNetwork.RxBytes
dockerStatNetwork.RxPackets += lastStatBeforeLastRestartNetwork.RxPackets
dockerStatNetwork.RxDropped += lastStatBeforeLastRestartNetwork.RxDropped
dockerStatNetwork.TxBytes += lastStatBeforeLastRestartNetwork.TxBytes
dockerStatNetwork.TxPackets += lastStatBeforeLastRestartNetwork.TxPackets
dockerStatNetwork.TxDropped += lastStatBeforeLastRestartNetwork.TxDropped
}
dockerStat.Networks[key] = dockerStatNetwork
}
return dockerStat
}