prometheus-to-sd/translator/translator.go (476 lines of code) (raw):

/* Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package translator import ( "context" "fmt" "math" "strings" "time" "github.com/golang/glog" dto "github.com/prometheus/client_model/go" "google.golang.org/genproto/googleapis/api/distribution" "google.golang.org/genproto/googleapis/api/label" "google.golang.org/genproto/googleapis/api/metric" "google.golang.org/genproto/googleapis/api/monitoredres" v3 "google.golang.org/genproto/googleapis/monitoring/v3" "google.golang.org/protobuf/types/known/timestamppb" "github.com/GoogleCloudPlatform/k8s-stackdriver/prometheus-to-sd/config" ) const ( // Built-in Prometheus metric exporting process start time. processStartTimeMetric = "process_start_time_seconds" ) var supportedMetricTypes = map[dto.MetricType]bool{ dto.MetricType_COUNTER: true, dto.MetricType_GAUGE: true, dto.MetricType_HISTOGRAM: true, dto.MetricType_UNTYPED: true, } const falseValueEpsilon = 0.001 // TimeSeriesBuilder keeps track of incoming prometheus updates and can convert // last received one to Stackdriver TimeSeries. type TimeSeriesBuilder struct { config *config.CommonConfig cache *MetricDescriptorCache batch *batchWithTimestamp } type batchWithTimestamp struct { metrics *PrometheusResponse timestamp time.Time } // NewTimeSeriesBuilder creates new builder object that keeps intermediate state of metrics. func NewTimeSeriesBuilder(commonConfig *config.CommonConfig, cache *MetricDescriptorCache) *TimeSeriesBuilder { return &TimeSeriesBuilder{ config: commonConfig, cache: cache, } } // Update updates the internal state with current batch. func (t *TimeSeriesBuilder) Update(batch *PrometheusResponse, timestamp time.Time) { t.batch = &batchWithTimestamp{batch, timestamp} } // Build returns a new TimeSeries array and restarts the internal state. func (t *TimeSeriesBuilder) Build(ctx context.Context) ([]*v3.TimeSeries, time.Time, error) { var ts []*v3.TimeSeries if t.batch == nil { return ts, time.Now(), nil } defer func() { t.batch = nil }() metricFamilies, err := t.batch.metrics.Build(ctx, t.config, t.cache) if err != nil { return ts, time.Now(), err } // Get start time before whitelisting, because process start time // metric is likely not to be whitelisted. startTime := t.getStartTime(metricFamilies) metricFamilies = filterWhitelistedMetrics(metricFamilies, t.config.SourceConfig.Whitelisted) metricFamilies = filterWhitelistedLabels(metricFamilies, t.config.SourceConfig.WhitelistedLabelsMap) for name, metric := range metricFamilies { if t.cache.IsMetricBroken(name) { continue } f, err := translateFamily(t.config, metric, t.batch.timestamp, startTime, t.cache) if err != nil { glog.Warningf("Error while processing metric %s: %v", name, err) } else { ts = append(ts, f...) } } return ts, t.batch.timestamp, nil } // OmitComponentName removes from the metric names prefix that is equal to component name. func OmitComponentName(metricFamilies map[string]*dto.MetricFamily, componentName string) map[string]*dto.MetricFamily { result := make(map[string]*dto.MetricFamily) for metricName, metricFamily := range metricFamilies { newMetricName := strings.TrimPrefix(metricName, fmt.Sprintf("%s_", componentName)) metricFamily.Name = &newMetricName result[newMetricName] = metricFamily } return result } // DowncaseMetricNames downcases metric names. func DowncaseMetricNames(metricFamilies map[string]*dto.MetricFamily) map[string]*dto.MetricFamily { result := make(map[string]*dto.MetricFamily) for metricName, metricFamily := range metricFamilies { newMetricName := strings.ToLower(metricName) metricFamily.Name = &newMetricName result[newMetricName] = metricFamily } return result } // FlattenSummaryMetricFamilies flattens summary metric families into two counter metrics, // one for the running sum and count, respectively func FlattenSummaryMetricFamilies(metricFamilies map[string]*dto.MetricFamily) map[string]*dto.MetricFamily { result := make(map[string]*dto.MetricFamily) for metricName, family := range metricFamilies { switch family.GetType() { case dto.MetricType_SUMMARY: if len(family.Metric) < 1 { glog.V(2).Infof("Summary metric %v does not have metric data associated, ignoring", family.Name) continue } result[metricName+"_sum"] = sumMetricFromSummary(family.GetName(), family.Metric) result[metricName+"_count"] = countMetricFromSummary(family.GetName(), family.Metric) default: result[metricName] = family } } return result } // sumMetricFromSummary manipulates a Summary to extract out a specific sum MetricType_COUNTER metric func sumMetricFromSummary(name string, metrics []*dto.Metric) *dto.MetricFamily { n := name + "_sum" t := dto.MetricType_COUNTER newMetrics := make([]*dto.Metric, 0, len(metrics)) for _, m := range metrics { s := m.Summary.GetSampleSum() newMetric := &dto.Metric{ Label: m.Label, Counter: &dto.Counter{ Value: &s, }, } newMetrics = append(newMetrics, newMetric) } return &dto.MetricFamily{ Type: &t, Name: &n, Metric: newMetrics, } } // countMetricFromSummary manipulates a Summary to extract out a specific count MetricType_COUNTER metric func countMetricFromSummary(name string, metrics []*dto.Metric) *dto.MetricFamily { n := name + "_count" t := dto.MetricType_COUNTER newMetrics := make([]*dto.Metric, 0, len(metrics)) for _, m := range metrics { c := float64(m.Summary.GetSampleCount()) newMetric := &dto.Metric{ Label: m.Label, Counter: &dto.Counter{ Value: &c, }, } newMetrics = append(newMetrics, newMetric) } return &dto.MetricFamily{ Type: &t, Name: &n, Metric: newMetrics, } } func (t *TimeSeriesBuilder) getStartTime(metrics map[string]*dto.MetricFamily) time.Time { // For cumulative metrics we need to know process start time. // If the process start time is not specified, assuming it's // the unix 1 second, because Stackdriver can't handle // unix zero or unix negative number. startTime := time.Unix(1, 0) if family, found := metrics[processStartTimeMetric]; found && family.GetType() == dto.MetricType_GAUGE && len(family.GetMetric()) == 1 { startSec := family.Metric[0].Gauge.Value startTime = time.Unix(int64(*startSec), 0) glog.V(4).Infof("Monitored process start time: %v", startTime) } else { glog.Warningf("Metric %s invalid or not defined for component %s. Using %v instead. Cumulative metrics might be inaccurate.", processStartTimeMetric, t.config.SourceConfig.Component, startTime) } return startTime } func filterWhitelistedMetrics(allMetrics map[string]*dto.MetricFamily, whitelisted []string) map[string]*dto.MetricFamily { if len(whitelisted) == 0 { return allMetrics } glog.V(4).Infof("Exporting only whitelisted metrics: %v", whitelisted) res := map[string]*dto.MetricFamily{} for _, w := range whitelisted { if family, found := allMetrics[w]; found { res[w] = family } else { glog.V(3).Infof("Whitelisted metric %s not present in Prometheus endpoint.", w) } } return res } func filterWhitelistedLabels(allMetrics map[string]*dto.MetricFamily, whitelistedLabelsMap map[string]map[string]bool) map[string]*dto.MetricFamily { if len(whitelistedLabelsMap) == 0 { return allMetrics } glog.V(4).Infof("Exporting only whitelisted label values: %v", whitelistedLabelsMap) res := map[string]*dto.MetricFamily{} for metricName, metricFamily := range allMetrics { var filteredMetrics []*dto.Metric for _, metric := range metricFamily.Metric { labels := metric.GetLabel() for _, label := range labels { if whitelistedLabelValues, found := whitelistedLabelsMap[*label.Name]; found && whitelistedLabelValues[*label.Value] { filteredMetrics = append(filteredMetrics, metric) } } } if len(filteredMetrics) > 0 { res[metricName] = &dto.MetricFamily{ Name: metricFamily.Name, Help: metricFamily.Help, Type: metricFamily.Type, Metric: filteredMetrics, } } else { glog.V(3).Infof("Whitelisted label values for metric %s not found in Prometheus endpoint.", metricFamily) } } return res } func translateFamily(config *config.CommonConfig, family *dto.MetricFamily, timestamp time.Time, startTime time.Time, cache *MetricDescriptorCache) ([]*v3.TimeSeries, error) { glog.V(3).Infof("Translating metric family %v from component %v", family.GetName(), config.SourceConfig.Component) var ts []*v3.TimeSeries if _, found := supportedMetricTypes[family.GetType()]; !found { return ts, fmt.Errorf("metric type %v of family %s not supported", family.GetType(), family.GetName()) } for _, metric := range family.GetMetric() { t := translateOne(config, family.GetName(), family.GetType(), metric, startTime, timestamp, cache) ts = append(ts, t) glog.V(4).Infof("%+v\nMetric: %+v, Interval: %+v", *t, *(t.Metric), t.Points[0].Interval) } return ts, nil } // getMetricType creates metric type name base on the metric prefix, component name and metric name. func getMetricType(config *config.CommonConfig, name string) string { if config.SourceConfig.Component == "" { return fmt.Sprintf("%s/%s", config.SourceConfig.MetricsPrefix, name) } return fmt.Sprintf("%s/%s/%s", config.SourceConfig.MetricsPrefix, config.SourceConfig.Component, name) } // assumes that mType is Counter, Gauge or Histogram func translateOne(config *config.CommonConfig, name string, mType dto.MetricType, m *dto.Metric, start time.Time, end time.Time, cache *MetricDescriptorCache) *v3.TimeSeries { interval := &v3.TimeInterval{ EndTime: timestamppb.New(end.UTC()), } metricKind := extractMetricKind(mType) if metricKind == metric.MetricDescriptor_CUMULATIVE { interval.StartTime = timestamppb.New(start.UTC()) } valueType := extractValueType(mType, cache.getMetricDescriptor(name)) point := &v3.Point{ Interval: interval, Value: &v3.TypedValue{}, } setValue(mType, valueType, m, point) mr := getMonitoredResourceFromLabels(config, m.GetLabel()) glog.V(4).Infof("MonitoredResource to write: %v", mr) return &v3.TimeSeries{ Metric: &metric.Metric{ Labels: getMetricLabels(config, m.GetLabel()), Type: getMetricType(config, name), }, Resource: getMonitoredResourceFromLabels(config, m.GetLabel()), MetricKind: metricKind, ValueType: valueType, Points: []*v3.Point{point}, } } func setValue(mType dto.MetricType, valueType metric.MetricDescriptor_ValueType, metric *dto.Metric, point *v3.Point) { if mType == dto.MetricType_GAUGE { setValueBaseOnSimpleType(metric.GetGauge().GetValue(), valueType, point) } else if mType == dto.MetricType_HISTOGRAM { val := convertToDistributionValue(metric.GetHistogram()) point.Value = &v3.TypedValue{ Value: &v3.TypedValue_DistributionValue{DistributionValue: val}, } } else if mType == dto.MetricType_UNTYPED { setValueBaseOnSimpleType(metric.GetUntyped().GetValue(), valueType, point) } else { setValueBaseOnSimpleType(metric.GetCounter().GetValue(), valueType, point) } } func setValueBaseOnSimpleType(value float64, valueType metric.MetricDescriptor_ValueType, point *v3.Point) { if valueType == metric.MetricDescriptor_INT64 { point.Value = &v3.TypedValue{ Value: &v3.TypedValue_Int64Value{Int64Value: int64(value)}, } } else if valueType == metric.MetricDescriptor_DOUBLE { point.Value = &v3.TypedValue{ Value: &v3.TypedValue_DoubleValue{DoubleValue: value}, } } else if valueType == metric.MetricDescriptor_BOOL { var val = math.Abs(value) > falseValueEpsilon point.Value = &v3.TypedValue{ Value: &v3.TypedValue_BoolValue{BoolValue: val}, } } else { glog.Errorf("Value type '%s' is not supported yet.", valueType) } } func convertToDistributionValue(h *dto.Histogram) *distribution.Distribution { count := int64(h.GetSampleCount()) mean := float64(0) dev := float64(0) var bounds []float64 var values []int64 if count > 0 { mean = h.GetSampleSum() / float64(count) } prevVal := uint64(0) lower := float64(0) infSeen := false for _, b := range h.Bucket { upper := b.GetUpperBound() if !math.IsInf(b.GetUpperBound(), 1) { bounds = append(bounds, b.GetUpperBound()) } else { infSeen = true upper = lower } val := b.GetCumulativeCount() - prevVal x := (lower + upper) / float64(2) dev += float64(val) * (x - mean) * (x - mean) values = append(values, int64(b.GetCumulativeCount()-prevVal)) lower = b.GetUpperBound() prevVal = b.GetCumulativeCount() } // +Inf Bucket is implicit so it needs to be added if !infSeen && count > int64(prevVal) { values = append(values, count-int64(prevVal)) } return &distribution.Distribution{ Count: count, Mean: mean, SumOfSquaredDeviation: dev, BucketOptions: &distribution.Distribution_BucketOptions{ Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{ ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{ Bounds: bounds, }, }, }, BucketCounts: values, } } func getMetricLabels(config *config.CommonConfig, labels []*dto.LabelPair) map[string]string { metricLabels := map[string]string{} for _, label := range labels { if config.SourceConfig.PodConfig.IsMetricLabel(label.GetName()) { metricLabels[label.GetName()] = label.GetValue() } } return metricLabels } // MetricFamilyToMetricDescriptor converts MetricFamily object to the MetricDescriptor. If needed it uses information // from the previously created metricDescriptor (for example to merge label sets). func MetricFamilyToMetricDescriptor(config *config.CommonConfig, family *dto.MetricFamily, originalDescriptor *metric.MetricDescriptor) *metric.MetricDescriptor { return &metric.MetricDescriptor{ Description: family.GetHelp(), Type: getMetricType(config, family.GetName()), MetricKind: extractMetricKind(family.GetType()), ValueType: extractValueType(family.GetType(), originalDescriptor), Labels: extractAllLabels(family, originalDescriptor), } } func extractMetricKind(mType dto.MetricType) metric.MetricDescriptor_MetricKind { if mType == dto.MetricType_COUNTER || mType == dto.MetricType_HISTOGRAM { return metric.MetricDescriptor_CUMULATIVE } return metric.MetricDescriptor_GAUGE } func extractValueType(mType dto.MetricType, originalDescriptor *metric.MetricDescriptor) metric.MetricDescriptor_ValueType { // If MetricDescriptor is created already in the Stackdriver use stored value type. // This is going to work perfectly for "container.googleapis.com" metrics. if originalDescriptor != nil { // TODO(loburm): for custom metrics add logic that can figure value type base on the actual values. return originalDescriptor.ValueType } if mType == dto.MetricType_HISTOGRAM { return metric.MetricDescriptor_DISTRIBUTION } if mType == dto.MetricType_UNTYPED { return metric.MetricDescriptor_DOUBLE } return metric.MetricDescriptor_INT64 } func extractAllLabels(family *dto.MetricFamily, originalDescriptor *metric.MetricDescriptor) []*label.LabelDescriptor { var labels []*label.LabelDescriptor labelSet := make(map[string]bool) for _, metric := range family.GetMetric() { for _, l := range metric.GetLabel() { _, ok := labelSet[l.GetName()] if !ok { labels = append(labels, &label.LabelDescriptor{Key: l.GetName()}) labelSet[l.GetName()] = true } } } if originalDescriptor != nil { for _, l := range originalDescriptor.Labels { _, ok := labelSet[l.Key] if !ok { labels = append(labels, l) labelSet[l.Key] = true } } } return labels } func createProjectName(config *config.GceConfig) string { return fmt.Sprintf("projects/%s", config.Project) } func getMonitoredResourceFromLabels(config *config.CommonConfig, labels []*dto.LabelPair) *monitoredres.MonitoredResource { if config.SourceConfig.CustomResourceType != "" { return getCustomMonitoredResource(config) } container, pod, namespace := config.SourceConfig.PodConfig.GetPodInfo(labels) prefix := config.MonitoredResourceTypePrefix if prefix == "" { return &monitoredres.MonitoredResource{ Type: "gke_container", Labels: map[string]string{ "project_id": config.GceConfig.Project, "cluster_name": config.GceConfig.Cluster, "zone": config.GceConfig.Zone, "instance_id": config.GceConfig.Instance, "namespace_id": namespace, "pod_id": pod, "container_name": container, }, } } resourceLabels := make(map[string]string) for k, v := range config.MonitoredResourceLabels { resourceLabels[k] = v } if _, found := resourceLabels["project_id"]; !found { resourceLabels["project_id"] = config.GceConfig.Project } if _, found := resourceLabels["cluster_name"]; !found { resourceLabels["cluster_name"] = config.GceConfig.Cluster } if _, found := resourceLabels["location"]; !found { resourceLabels["location"] = config.GceConfig.ClusterLocation } // When MonitoredResource type is not "k8s_*", default "instance_id" label to GCE instance name. if prefix != "k8s_" { if _, found := resourceLabels["instance_id"]; !found { resourceLabels["instance_id"] = config.GceConfig.InstanceId } } // When namespace and pod are unspecified, it should be written to node type. if namespace == "" || pod == "" || pod == "machine" { // When MonitoredResource is "k8s_node", default "node_name" label to GCE instance name. if prefix == "k8s_" { if _, found := resourceLabels["node_name"]; !found { resourceLabels["node_name"] = config.GceConfig.Instance } } return &monitoredres.MonitoredResource{ Type: prefix + "node", Labels: resourceLabels, } } resourceLabels["namespace_name"] = namespace resourceLabels["pod_name"] = pod if container == "" { return &monitoredres.MonitoredResource{ Type: prefix + "pod", Labels: resourceLabels, } } resourceLabels["container_name"] = container return &monitoredres.MonitoredResource{ Type: prefix + "container", Labels: resourceLabels, } } func getCustomMonitoredResource(config *config.CommonConfig) *monitoredres.MonitoredResource { resourceLabels := config.SourceConfig.CustomLabels applyDefaultIfEmpty(resourceLabels, "instance_id", config.GceConfig.InstanceId) applyDefaultIfEmpty(resourceLabels, "project_id", config.GceConfig.Project) applyDefaultIfEmpty(resourceLabels, "cluster_name", config.GceConfig.Cluster) applyDefaultIfEmpty(resourceLabels, "location", config.GceConfig.ClusterLocation) applyDefaultIfEmpty(resourceLabels, "node_name", config.GceConfig.Instance) return &monitoredres.MonitoredResource{ Type: config.SourceConfig.CustomResourceType, Labels: resourceLabels, } } func applyDefaultIfEmpty(resourceLabels map[string]string, key, defaultValue string) { if val, found := resourceLabels[key]; found && val == "" { resourceLabels[key] = defaultValue } }