func()

in agent/stats/engine.go [813:943]


func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]*ecstcs.ContainerMetric, error) {
	containerMap, taskExists := engine.tasksToContainers[taskArn]
	if !taskExists {
		return nil, fmt.Errorf("task not found")
	}

	var containerMetrics []*ecstcs.ContainerMetric
	for _, container := range containerMap {
		dockerID := container.containerMetadata.DockerID
		// Check if the container is terminal. If it is, make sure that it is
		// cleaned up properly. We might sometimes miss events from docker task
		// engine and this helps in reconciling the state. The tcs client's
		// GetInstanceMetrics probe is used as the trigger for this.
		if engine.stopTrackingContainerUnsafe(container, taskArn) {
			continue
		}
		// age is used to determine if we should or should not expect missing metrics.
		// this is because recently-started containers would normally not have their metrics
		// queue filled yet.
		age := time.Since(container.containerMetadata.StartedAt)
		// gracePeriod is the time that containers are allowed to have missing metrics
		// without throwing/logging errors.
		gracePeriod := time.Second * 30

		// CPU and Memory are both critical, so skip the container if either of these fail.
		cpuStatsSet, err := container.statsQueue.GetCPUStatsSet()
		if err != nil {
			if age < gracePeriod {
				continue
			}
			logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{
				field.Container: dockerID,
				field.Error:     err,
			})
			continue
		}
		memoryStatsSet, err := container.statsQueue.GetMemoryStatsSet()
		if err != nil {
			if age < gracePeriod {
				continue
			}
			logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{
				field.Container: dockerID,
				field.Error:     err,
			})
			continue
		}

		containerMetric := &ecstcs.ContainerMetric{
			ContainerName:  &container.containerMetadata.Name,
			CpuStatsSet:    cpuStatsSet,
			MemoryStatsSet: memoryStatsSet,
		}

		storageStatsSet, err := container.statsQueue.GetStorageStatsSet()
		if err != nil && age > gracePeriod {
			logger.Warn("Error getting storage stats for container", logger.Fields{
				field.Container: dockerID,
				field.Error:     err,
			})
		} else {
			containerMetric.StorageStatsSet = storageStatsSet
		}

		restartStatsSet, err := container.statsQueue.GetRestartStatsSet()
		if err != nil && age > gracePeriod {
			// we expect to get an error here if there are no restart metrics,
			// which would be common as it just means there is no restart policy on
			// the container, so just log a debug message here.
			logger.Debug("Unable to get restart stats for container", logger.Fields{
				field.Container: dockerID,
				field.Error:     err,
			})
		} else {
			containerMetric.RestartStatsSet = restartStatsSet
		}

		task, err := engine.resolver.ResolveTask(dockerID)
		if err != nil {
			logger.Warn("Task not found for container", logger.Fields{
				field.Container: dockerID,
				field.Error:     err,
			})
		} else {
			if dockerContainer, err := engine.resolver.ResolveContainer(dockerID); err != nil {
				logger.Warn("Could not map container ID to container, container", logger.Fields{
					field.DockerId: dockerID,
					field.Error:    err,
				})
			} else {
				// send network stats for default/bridge/nat/awsvpc network modes
				if task.IsNetworkModeBridge() {
					if task.IsServiceConnectEnabled() && dockerContainer.Container.Type == apicontainer.ContainerCNIPause {
						seelog.Debug("Skip adding network stats for pause container in Service Connect enabled task")
					} else {
						networkStatsSet, err := container.statsQueue.GetNetworkStatsSet()
						if err != nil && age > gracePeriod {
							// we log the error and still continue to publish cpu, memory stats
							logger.Warn("Error getting network stats for container", logger.Fields{
								field.Container: dockerID,
								field.Error:     err,
							})
						} else {
							containerMetric.NetworkStatsSet = networkStatsSet
						}
					}
				} else if task.IsNetworkModeAWSVPC() {
					taskStatsMap, taskExistsInTaskStats := engine.taskToTaskStats[taskArn]
					if !taskExistsInTaskStats {
						return nil, fmt.Errorf("task not found")
					}
					// do not add network stats for pause container
					if dockerContainer.Container.Type != apicontainer.ContainerCNIPause {
						networkStats, err := taskStatsMap.StatsQueue.GetNetworkStatsSet()
						if err != nil && age > gracePeriod {
							logger.Warn("Error getting network stats for container", logger.Fields{
								field.TaskARN:   taskArn,
								field.Container: dockerContainer.DockerID,
								field.Error:     err,
							})
						} else {
							containerMetric.NetworkStatsSet = networkStats
						}
					}
				}
			}
		}
		containerMetrics = append(containerMetrics, containerMetric)
	}
	return containerMetrics, nil
}