util/common/metrics.go (307 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package common
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"time"
"collectd.org/api"
"collectd.org/exec"
"collectd.org/network"
"github.com/DataDog/datadog-go/statsd"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/prozz/aws-embedded-metrics-golang/emf"
)
const SleepDuration = 5 * time.Second
const TracesEndpoint = "4316/v1/traces"
const MetricEndpoint = "4316/v1/metrics"
// StartSendingMetrics will generate metrics load based on the receiver (e.g 5000 statsd metrics per minute)
func StartSendingMetrics(receiver string, duration, sendingInterval time.Duration, metricPerInterval int, metricLogGroup, metricNamespace string) (err error) {
go func() {
switch receiver {
case "statsd":
err = SendStatsdMetrics(metricPerInterval, []string{}, sendingInterval, duration)
case "collectd":
err = SendCollectDMetrics(metricPerInterval, sendingInterval, duration)
case "emf":
err = SendEMFMetrics(metricPerInterval, metricLogGroup, metricNamespace, sendingInterval, duration)
case "app_signals":
err = SendAppSignalMetrics(duration) //does app signals have dimension for metric?
case "traces":
err = SendAppSignalsTraceMetrics(duration) //does app signals have dimension for metric?
default:
}
}()
return err
}
func SendAppSignalsTraceMetrics(duration time.Duration) error {
baseDir := getBaseDir()
for i := 0; i < int(duration/(5*time.Second)); i++ {
startTime := time.Now().UnixNano()
traceID := generateTraceID()
traceIDStr := hex.EncodeToString(traceID[:])
err := processTraceFile(filepath.Join(baseDir, "traces.json"), startTime, traceIDStr)
if err != nil {
fmt.Println("Error processing trace file:", err)
return err
}
time.Sleep(5 * time.Second)
}
return nil
}
func getBaseDir() string {
if runtime.GOOS == "windows" {
return "C:\\Users\\Administrator\\amazon-cloudwatch-agent-test\\test\\app_signals\\resources\\traces"
}
return "/Users/ec2-user/amazon-cloudwatch-agent-test/test/app_signals/resources/traces"
}
func generateTraceID() [16]byte {
var r [16]byte
epochNow := time.Now().Unix()
binary.BigEndian.PutUint32(r[0:4], uint32(epochNow))
rand.Read(r[4:])
return r
}
func processTraceFile(filePath string, startTime int64, traceID string) error {
data, err := os.ReadFile(filePath)
if err != nil {
return err
}
modifiedData := strings.ReplaceAll(string(data), "START_TIME", fmt.Sprintf("%d", startTime))
modifiedData = strings.ReplaceAll(modifiedData, "TRACE_ID", traceID)
url := "http://127.0.0.1:" + TracesEndpoint
_, err = http.Post(url, "application/json", bytes.NewBufferString(modifiedData))
if err != nil {
return err
}
return nil
}
func SendCollectDMetrics(metricPerInterval int, sendingInterval, duration time.Duration) error {
// https://github.com/collectd/go-collectd/tree/92e86f95efac5eb62fa84acc6033e7a57218b606
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, err := network.Dial(
net.JoinHostPort("127.0.0.1", network.DefaultService),
network.ClientOptions{
SecurityLevel: network.None,
})
if err != nil {
return err
}
defer client.Close()
ticker := time.NewTicker(sendingInterval)
defer ticker.Stop()
endTimeout := time.After(duration)
// Sending the collectd metric within the first minute before the ticker kicks in the next minute
for t := 1; t <= metricPerInterval/2; t++ {
_ = client.Write(ctx, &api.ValueList{
Identifier: api.Identifier{
Host: exec.Hostname(),
Plugin: fmt.Sprint("gauge_", t),
Type: "gauge",
},
Time: time.Now(),
Interval: time.Minute,
Values: []api.Value{api.Gauge(t)},
})
err = client.Write(ctx, &api.ValueList{
Identifier: api.Identifier{
Host: exec.Hostname(),
Plugin: fmt.Sprint("counter_", t),
Type: "counter",
},
Time: time.Now(),
Interval: time.Minute,
Values: []api.Value{api.Counter(t)},
})
if err != nil && !errors.Is(err, network.ErrNotEnoughSpace) {
return err
}
}
time.Sleep(30 * time.Second)
if err := client.Flush(); err != nil {
return err
}
for {
select {
case <-ticker.C:
for t := 1; t <= metricPerInterval/2; t++ {
_ = client.Write(ctx, &api.ValueList{
Identifier: api.Identifier{
Host: exec.Hostname(),
Plugin: fmt.Sprint("gauge_", t),
Type: "gauge",
},
Time: time.Now(),
Interval: time.Minute,
Values: []api.Value{api.Gauge(t)},
})
err = client.Write(ctx, &api.ValueList{
Identifier: api.Identifier{
Host: exec.Hostname(),
Plugin: fmt.Sprint("counter_", t),
Type: "counter",
},
Time: time.Now(),
Interval: time.Minute,
Values: []api.Value{api.Counter(t)},
})
if err != nil && !errors.Is(err, network.ErrNotEnoughSpace) {
return err
}
}
if err := client.Flush(); err != nil {
return err
}
case <-endTimeout:
return nil
}
}
}
func processFile(filePath string, startTime int64) error {
data, err := os.ReadFile(filePath)
if err != nil {
fmt.Println("Error reading file:", err)
return nil
}
//replace START_TIME with the current time
modifiedData := strings.ReplaceAll(string(data), "START_TIME", fmt.Sprintf("%d", startTime))
//curl command
url := "http://127.0.0.1:" + MetricEndpoint
_, err = http.Post(url, "application/json", bytes.NewBufferString(modifiedData))
_, err = http.Post(url, "application/json", bytes.NewBufferString(modifiedData))
if err != nil {
fmt.Println("Failed to send POST request to", url)
fmt.Printf("Error: %v\n", err)
return err
}
return nil
}
func SendAppSignalMetrics(duration time.Duration) error {
// The bash script to be executed asynchronously.
dir, err := os.Getwd()
if err != nil {
fmt.Println("Error getting current directory:", err)
return err
}
fmt.Println("Current Directory:", dir)
// Determine the base directory for the files based on the OS
var baseDir string
if runtime.GOOS == "windows" {
baseDir = filepath.Join("C:", "Users", "Administrator", "amazon-cloudwatch-agent-test", "test", "app_signals", "resources", "metrics")
} else { // assuming macOS or Unix-like system
baseDir = filepath.Join("/", "Users", "ec2-user", "amazon-cloudwatch-agent-test", "test", "app_signals", "resources", "metrics")
}
fmt.Println("Base directory:", baseDir)
for i := 0; i < int(duration/SleepDuration); i++ {
if err != nil {
return err
}
//start time to send to process file
startTime := time.Now().UnixNano()
//process files
err = processFile(filepath.Join(baseDir, "server_consumer.json"), startTime)
if err != nil {
return err
}
err = processFile(filepath.Join(baseDir, "client_producer.json"), startTime)
if err != nil {
return err
}
time.Sleep(5 * time.Second)
}
return nil
}
func SendStatsdMetrics(metricPerInterval int, metricDimension []string, sendingInterval, duration time.Duration) error {
// https://github.com/DataDog/datadog-go#metrics
client, err := statsd.New("127.0.0.1:8125", statsd.WithMaxMessagesPerPayload(100), statsd.WithNamespace("statsd"), statsd.WithoutTelemetry())
if err != nil {
return err
}
defer client.Close()
ticker := time.NewTicker(sendingInterval)
defer ticker.Stop()
endTimeout := time.After(duration)
// Sending the statsd metric within the first minute before the ticker kicks in the next minute
for t := 1; t <= metricPerInterval/2; t++ {
if err := client.Count(fmt.Sprint("counter_", t), int64(t), metricDimension, 1.0); err != nil {
return err
}
if err := client.Gauge(fmt.Sprint("gauge_", t), float64(t), metricDimension, 1.0); err != nil {
return err
}
}
for {
select {
case <-ticker.C:
for t := 1; t <= metricPerInterval/2; t++ {
client.Count(fmt.Sprint("counter_", t), int64(t), metricDimension, 1.0)
client.Gauge(fmt.Sprint("gauge_", t), float64(t), metricDimension, 1.0)
}
case <-endTimeout:
return nil
}
}
}
func SendEMFMetrics(metricPerInterval int, metricLogGroup, metricNamespace string, sendingInterval, duration time.Duration) error {
// github.com/prozz/aws-embedded-metrics-golang/emf
conn, err := net.DialTimeout("tcp", "127.0.0.1:25888", time.Millisecond*10000)
if err != nil {
return err
}
defer conn.Close()
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
endTimeout := time.After(duration)
for t := 1; t <= metricPerInterval; t++ {
emf.New(emf.WithWriter(conn), emf.WithLogGroup(metricLogGroup)).
Namespace(metricNamespace).
DimensionSet(
emf.NewDimension("InstanceId", metricLogGroup),
).
MetricAs(fmt.Sprint("emf_time_", t), t, emf.Milliseconds).
Log()
}
for {
select {
case <-ticker.C:
for t := 1; t <= metricPerInterval; t++ {
emf.New(emf.WithWriter(conn), emf.WithLogGroup(metricLogGroup)).
Namespace(metricNamespace).
DimensionSet(
emf.NewDimension("InstanceId", metricLogGroup),
).
MetricAs(fmt.Sprint("emf_time_", t), t, emf.Milliseconds).
Log()
}
case <-endTimeout:
return nil
}
}
}
// This function builds and signs an ListEntitiesForMetric call, essentially trying to replicate this curl command:
//
// curl -i -X POST monitoring.us-west-2.amazonaws.com -H 'Content-Type: application/json' \
// -H 'Content-Encoding: amz-1.0' \
// --user "$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY" \
// -H "x-amz-security-token: $AWS_SESSION_TOKEN" \
// --aws-sigv4 "aws:amz:us-west-2:monitoring" \
// -H 'X-Amz-Target: com.amazonaws.cloudwatch.v2013_01_16.CloudWatchVersion20130116.ListEntitiesForMetric' \
// -d '{
// // sample request body:
// "Namespace": "CWAgent",
// "MetricName": "cpu_usage_idle",
// "Dimensions": [{"Name": "InstanceId", "Value": "i-0123456789012"}, { "Name": "cpu", "Value": "cpu-total"}]
// }'
func BuildListEntitiesForMetricRequest(body []byte, region string) (*http.Request, error) {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(region))
if err != nil {
return nil, err
}
signer := v4.NewSigner()
h := sha256.New()
h.Write(body)
payloadHash := hex.EncodeToString(h.Sum(nil))
// build the request
req, err := http.NewRequest("POST", "https://monitoring."+region+".amazonaws.com/", bytes.NewReader(body))
if err != nil {
return nil, err
}
// set headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Amz-Target", "com.amazonaws.cloudwatch.v2013_01_16.CloudWatchVersion20130116.ListEntitiesForMetric")
req.Header.Set("Content-Encoding", "amz-1.0")
// set creds
credentials, err := cfg.Credentials.Retrieve(context.TODO())
if err != nil {
return nil, err
}
req.Header.Set("x-amz-security-token", credentials.SessionToken)
// sign the request
err = signer.SignHTTP(context.TODO(), credentials, req, payloadHash, "monitoring", region, time.Now())
if err != nil {
return nil, err
}
return req, nil
}