x-pack/metricbeat/module/awsfargate/task_stats/task_stats.go (179 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package task_stats
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"time"
dcontainer "github.com/docker/docker/api/types/container"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/docker/cpu"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/awsfargate"
"github.com/elastic/elastic-agent-libs/logp"
)
var (
metricsetName = "task_stats"
taskStatsPath = "task/stats"
taskPath = "task"
queryTaskMetadataEndpointTimeout = 60 * time.Second
)
// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
mb.Registry.MustAddMetricSet(awsfargate.ModuleName, metricsetName, New,
mb.DefaultMetricSet(),
)
}
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
*awsfargate.MetricSet
logger *logp.Logger
taskStatsEndpoint string
taskEndpoint string
}
// TaskInfo is a struct that represents information about a specific ECS Fargate Task
type TaskInfo struct {
Cluster string
TaskARN string
Family string
Revision string
TaskDesiredStatus string
TaskKnownStatus string
}
// Stats is a struct that represents information regarding a container
type Stats struct {
Time common.Time
taskInfo TaskInfo
Container *container
cpuStats cpu.CPUStats
memoryStats memoryStats
networkStats []networkStats
blkioStats blkioStats
}
// TaskMetadata is a struct that represents response body from ${ECS_CONTAINER_METADATA_URI_V4}/task
type TaskMetadata struct {
Cluster string `json:"Cluster"`
TaskARN string `json:"TaskARN"`
Family string `json:"Family"`
Revision string `json:"Revision"`
DesiredStatus string `json:"DesiredStatus"`
KnownStatus string `json:"KnownStatus"`
Limit Limits `json:"Limits"`
Containers []*container `json:"Containers"`
}
// Limits is a struct that represents the memory limit from ${ECS_CONTAINER_METADATA_URI_V4}/task, which is the Hard Memory Limit set in AWS ECS
type Limits struct {
Memory uint64 `json:"Memory"`
}
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
logger := logp.NewLogger(metricsetName)
metricSet, err := awsfargate.NewMetricSet(base)
if err != nil {
return nil, fmt.Errorf("error creating %s metricset: %w", metricsetName, err)
}
ecsURI, ok := os.LookupEnv("ECS_CONTAINER_METADATA_URI_V4")
if !ok {
return nil, fmt.Errorf("lookup $ECS_CONTAINER_METADATA_URI_V4 failed")
}
return &MetricSet{
MetricSet: metricSet,
logger: logger,
taskStatsEndpoint: fmt.Sprintf("%s/%s", ecsURI, taskStatsPath),
taskEndpoint: fmt.Sprintf("%s/%s", ecsURI, taskPath),
}, nil
}
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
formattedStats, err := m.queryTaskMetadataEndpoints()
if err != nil {
err := fmt.Errorf("queryTaskMetadataEndpoints failed: %w", err)
m.logger.Error(err)
return err
}
eventsMapping(report, formattedStats)
return nil
}
func (m *MetricSet) queryTaskMetadataEndpoints() ([]Stats, error) {
context, cancel := context.WithTimeout(context.Background(), queryTaskMetadataEndpointTimeout)
defer cancel()
// Collect information from ${ECS_CONTAINER_METADATA_URI_V4}/task/stats
req, err := http.NewRequestWithContext(context, http.MethodGet, m.taskStatsEndpoint, nil)
if err != nil {
return nil, fmt.Errorf("http.NewRequestWithContext: %w", err)
}
taskStatsResp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http.Get failed: %w", err)
}
defer taskStatsResp.Body.Close()
taskStatsOutput, err := getTaskStats(taskStatsResp)
if err != nil {
return nil, fmt.Errorf("getTaskStats failed: %w", err)
}
// Collect container metadata information from ${ECS_CONTAINER_METADATA_URI_V4}/task
req, err = http.NewRequestWithContext(context, http.MethodGet, m.taskEndpoint, nil)
if err != nil {
return nil, fmt.Errorf("http.NewRequestWithContext: %w", err)
}
taskResp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http.Get failed: %w", err)
}
defer taskResp.Body.Close()
taskOutput, err := getTask(taskResp)
if err != nil {
return nil, fmt.Errorf("getTask failed: %w", err)
}
formattedStats := getStatsList(taskStatsOutput, taskOutput)
return formattedStats, nil
}
func getTaskStats(taskStatsResp *http.Response) (map[string]dcontainer.StatsResponse, error) {
taskStatsBody, err := io.ReadAll(taskStatsResp.Body)
if err != nil {
return nil, fmt.Errorf("ioutil.ReadAll failed: %w", err)
}
var taskStatsOutput map[string]dcontainer.StatsResponse
err = json.Unmarshal(taskStatsBody, &taskStatsOutput)
if err != nil {
return nil, fmt.Errorf("json.Unmarshal failed: %w", err)
}
return taskStatsOutput, nil
}
func getTask(taskResp *http.Response) (TaskMetadata, error) {
taskBody, err := io.ReadAll(taskResp.Body)
if err != nil {
return TaskMetadata{}, fmt.Errorf("ioutil.ReadAll failed: %w", err)
}
var taskOutput TaskMetadata
err = json.Unmarshal(taskBody, &taskOutput)
if err != nil {
return TaskMetadata{}, fmt.Errorf("json.Unmarshal failed: %w", err)
}
return taskOutput, nil
}
func getStatsList(taskStatsOutput map[string]dcontainer.StatsResponse, taskOutput TaskMetadata) []Stats {
containersInfo := map[string]container{}
taskInfo := TaskInfo{
Family: taskOutput.Family,
TaskARN: taskOutput.TaskARN,
Cluster: taskOutput.Cluster,
Revision: taskOutput.Revision,
TaskDesiredStatus: taskOutput.DesiredStatus,
TaskKnownStatus: taskOutput.KnownStatus,
}
for _, c := range taskOutput.Containers {
// Skip ~internal~ecs~pause container
if c.Name == "~internal~ecs~pause" {
continue
}
containersInfo[c.DockerId] = *c
}
var formattedStats []Stats
for id, taskStats := range taskStatsOutput {
if c, ok := containersInfo[id]; ok {
statsPerContainer := Stats{
Time: common.Time(taskStats.Read),
taskInfo: taskInfo,
Container: getContainerMetadata(&c),
cpuStats: getCPUStats(taskStats),
memoryStats: getMemoryStats(taskStats),
networkStats: getNetworkStats(taskStats),
blkioStats: getBlkioStats(taskStats.BlkioStats),
}
formattedStats = append(formattedStats, statsPerContainer)
}
}
return formattedStats
}