pkg/metrics/gcm.go (225 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"context"
"fmt"
"log"
"time"
monitoring "cloud.google.com/go/monitoring/apiv3/v2"
monitoringpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"github.com/GoogleCloudPlatform/gke-prober/pkg/common"
googlepb "github.com/golang/protobuf/ptypes/timestamp"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/grpc/codes"
)
const (
createTimeSeriesDeadline = 55 * time.Second
)
type gcmProvider struct {
client *monitoring.MetricClient
project string
resource *monitoredrespb.MonitoredResource
}
// StartGCM returns a Cloud Monitoring metrics provider
func StartGCM(ctx context.Context, cfg common.Config) (*gcmProvider, error) {
client, err := monitoring.NewMetricClient(ctx, option.WithUserAgent(common.UserAgent))
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
client.CallOptions = &monitoring.MetricCallOptions{
CreateTimeSeries: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.DeadlineExceeded,
codes.Unavailable,
}, gax.Backoff{
Initial: time.Second,
Max: 16 * time.Second,
Multiplier: 2,
})
}),
},
}
// Prepare metadata to specify the GCM "monitored resource"
var resource *monitoredrespb.MonitoredResource
if cfg.Mode == common.ModeCluster {
resource = &monitoredrespb.MonitoredResource{
Type: "k8s_cluster",
Labels: map[string]string{
"project_id": cfg.ProjectID,
"location": cfg.Location,
"cluster_name": cfg.Cluster,
},
}
} else {
resource = &monitoredrespb.MonitoredResource{
Type: "k8s_node",
Labels: map[string]string{
"project_id": cfg.ProjectID,
"location": cfg.Location,
"cluster_name": cfg.Cluster,
"node_name": cfg.NodeName,
},
}
}
provider := &gcmProvider{
// Prefer not to store context in a struct type, instead it should be passed as argument
// ctx: ctx,
client: client,
project: cfg.ProjectID,
resource: resource,
}
return provider, err
}
func gcmIntCounterPoint(value int) *monitoringpb.Point {
end := time.Now()
return &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
StartTime: &googlepb.Timestamp{
Seconds: end.Add(-time.Second).Unix(),
},
EndTime: &googlepb.Timestamp{
Seconds: end.Unix(),
},
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(value),
},
},
}
}
func gcmIntGaugePoint(value int) *monitoringpb.Point {
return &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
EndTime: &googlepb.Timestamp{
Seconds: time.Now().Unix(),
},
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(value),
},
},
}
}
func (p *gcmProvider) intCounterTimeSeries(prefix string, labels map[string]string, value int) *monitoringpb.TimeSeries {
return &monitoringpb.TimeSeries{
Metric: p.metricWithLabels(prefix, labels),
MetricKind: metricpb.MetricDescriptor_CUMULATIVE,
Resource: p.resource,
Points: []*monitoringpb.Point{
gcmIntCounterPoint(value),
},
}
}
func (p *gcmProvider) intGaugeTimeSeries(prefix string, labels map[string]string, value int) *monitoringpb.TimeSeries {
return &monitoringpb.TimeSeries{
Metric: p.metricWithLabels(prefix, labels),
MetricKind: metricpb.MetricDescriptor_GAUGE,
Resource: p.resource,
Points: []*monitoringpb.Point{
gcmIntGaugePoint(value),
},
}
}
func (p *gcmProvider) writeTimeSeries(ctx context.Context, ts ...*monitoringpb.TimeSeries) error {
cctx, cancel := context.WithDeadline(ctx, time.Now().Add(createTimeSeriesDeadline))
defer cancel()
err := p.client.CreateTimeSeries(cctx, &monitoringpb.CreateTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", p.project),
TimeSeries: ts,
})
return err
}
func (p *gcmProvider) metricWithLabels(m string, labels map[string]string) *metricpb.Metric {
return &metricpb.Metric{
Type: fmt.Sprintf("%s/%s", common.MetricPrefix, m),
Labels: labels,
}
}
func (p *gcmProvider) metric(m string) *metricpb.Metric {
return p.metricWithLabels(m, map[string]string{})
}
type gcmClusterRecorder struct {
p *gcmProvider
}
func (p *gcmProvider) Close() error {
return p.client.Close()
}
func (p *gcmProvider) ClusterRecorder() ClusterRecorder {
return &gcmClusterRecorder{p: p}
}
func (r *gcmClusterRecorder) RecordNodeConditions(ctx context.Context, counts []LabelCount) {
ts := []*monitoringpb.TimeSeries{}
for _, c := range counts {
ts = append(ts, r.p.intGaugeTimeSeries(MetricClusterNodeCondition, c.Labels, c.Count))
}
r.p.writeTimeSeries(ctx, ts...)
}
func (r *gcmClusterRecorder) RecordAddonCounts(ctx context.Context, counts []LabelCount) {
ts := []*monitoringpb.TimeSeries{}
for _, c := range counts {
ts = append(ts, r.p.intGaugeTimeSeries(MetricClusterAddonsExpected, c.Labels, c.Count))
}
r.p.writeTimeSeries(ctx, ts...)
}
func (r *gcmClusterRecorder) RecordNodeAvailabilities(ctx context.Context, counts []LabelCount) {
ts := []*monitoringpb.TimeSeries{}
for _, c := range counts {
ts = append(ts, r.p.intGaugeTimeSeries(MetricClusterNodeAvailable, c.Labels, c.Count))
}
r.p.writeTimeSeries(ctx, ts...)
}
type gcmNodeRecorder struct {
p *gcmProvider
}
func (p *gcmProvider) NodeRecorder() NodeRecorder {
return &gcmNodeRecorder{p: p}
}
func (r *gcmNodeRecorder) RecordAddonAvailabilies(ctx context.Context, counts []LabelCount) {
ts := []*monitoringpb.TimeSeries{}
for _, c := range counts {
ts = append(ts, r.p.intGaugeTimeSeries(MetricAddonAvailable, c.Labels, c.Count))
}
r.p.writeTimeSeries(ctx, ts...)
}
func (r *gcmNodeRecorder) RecordContainerRestart(ctx context.Context, labels map[string]string) {
ts := r.p.intCounterTimeSeries(MetricAddonRestart, labels, 1)
r.p.writeTimeSeries(ctx, ts)
}
func (r *gcmNodeRecorder) RecordAddonControlPlaneAvailability(ctx context.Context, labels map[string]string) {
ts := r.p.intGaugeTimeSeries(MetricAddonCPAvailable, labels, 1)
r.p.writeTimeSeries(ctx, ts)
}
func (r *gcmNodeRecorder) RecordNodeConditions(ctx context.Context, labels []map[string]string) {
ts := []*monitoringpb.TimeSeries{}
for _, clabel := range labels {
ts = append(ts, r.p.intGaugeTimeSeries(MetricNodeCondition, clabel, 1))
}
r.p.writeTimeSeries(ctx, ts...)
}
func (r *gcmNodeRecorder) RecordNodeAvailability(ctx context.Context, labels map[string]string) {
ts := r.p.intGaugeTimeSeries(MetricNodeAvailable, labels, 1)
r.p.writeTimeSeries(ctx, ts)
}
type gcmProbeReporter struct {
p *gcmProvider
}
func (p *gcmProvider) ProbeRecorder() ProbeRecorder {
return &gcmProbeReporter{p: p}
}
func (r *gcmProbeReporter) RecordDNSLookupLatency(elapsed time.Duration) {}
func (r *gcmProbeReporter) RecordHTTPGetLatency(statusCode int, elapsed time.Duration) {}
func (r *gcmProbeReporter) RecordAddonHealth(ctx context.Context, labels []map[string]string) {
ts := []*monitoringpb.TimeSeries{}
for _, clable := range labels {
ts = append(ts, r.p.intGaugeTimeSeries(MetricClusterAddonCondition, clable, 1))
}
r.p.writeTimeSeries(ctx, ts...)
}
func addonMap(addon common.Addon) map[string]string {
return map[string]string{
"addon": addon.Name,
"version": addon.Version,
"controller": addon.Kind,
}
}