internal/metrics/cloudwatch.go (83 lines of code) (raw):
package metrics
import (
"context"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/aws/aws-sdk-go/aws"
"k8s.io/klog"
)
// NewCloudWatchRegistry creates a new metric registry that will emit values using the specified cloudwatch client
func NewCloudWatchRegistry(cw *cloudwatch.Client) MetricRegistry {
return &cloudwatchRegistry{
cw: cw,
lock: &sync.Mutex{},
dataByNamespace: make(map[string][]*cloudwatchMetricDatum),
}
}
type cloudwatchRegistry struct {
cw *cloudwatch.Client
lock *sync.Mutex
dataByNamespace map[string][]*cloudwatchMetricDatum
}
type cloudwatchMetricDatum struct {
spec *MetricSpec
value float64
dimensions map[string]string
timestamp time.Time
}
func (r *cloudwatchRegistry) Record(spec *MetricSpec, value float64, dimensions map[string]string) {
r.lock.Lock()
defer r.lock.Unlock()
r.dataByNamespace[spec.Namespace] = append(r.dataByNamespace[spec.Namespace], &cloudwatchMetricDatum{
spec: spec,
value: value,
dimensions: dimensions,
timestamp: time.Now(),
})
}
func (r *cloudwatchRegistry) Emit() error {
r.lock.Lock()
defer r.lock.Unlock()
for namespace, data := range r.dataByNamespace {
for i := 0; i < len(data); {
var metricData []types.MetricDatum
// we can emit up to 1000 values per PutMetricData
for j := 0; j < len(data) && j < 1000; j++ {
datum := data[i]
var dimensions []types.Dimension
for key, val := range datum.dimensions {
dimensions = append(dimensions, types.Dimension{
Name: aws.String(key),
Value: aws.String(val),
})
}
metricData = append(metricData, types.MetricDatum{
MetricName: aws.String(datum.spec.Metric),
Value: aws.Float64(datum.value),
Dimensions: dimensions,
Timestamp: &datum.timestamp,
})
i++
}
_, err := r.cw.PutMetricData(context.TODO(), &cloudwatch.PutMetricDataInput{
Namespace: aws.String(namespace),
MetricData: metricData,
})
if err != nil {
return err
}
}
klog.Infof("emitted %d metrics to namespace: %s", len(data), namespace)
}
r.dataByNamespace = make(map[string][]*cloudwatchMetricDatum)
return nil
}
func (r *cloudwatchRegistry) GetRegistered() int {
r.lock.Lock()
defer r.lock.Unlock()
registered := 0
for _, data := range r.dataByNamespace {
registered += len(data)
}
return registered
}