custom-metrics-stackdriver-adapter/pkg/adapter/translator/query_builder.go (585 lines of code) (raw):

/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package translator import ( "fmt" "strconv" "strings" "time" "github.com/GoogleCloudPlatform/k8s-stackdriver/custom-metrics-stackdriver-adapter/pkg/adapter/translator/utils" "github.com/GoogleCloudPlatform/k8s-stackdriver/custom-metrics-stackdriver-adapter/pkg/config" stackdriver "google.golang.org/api/monitoring/v3" v1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/klog" ) var ( // allowedExternalMetricsLabelPrefixes and allowedExternalMetricsFullLabelNames specify all metric labels allowed for querying // External Metrics API. allowedExternalMetricsLabelPrefixes = []string{"metric.labels", "resource.labels", "metadata.system_labels", "metadata.user_labels"} allowedExternalMetricsFullLabelNames = []string{"resource.type", "reducer"} // allowedCustomMetricsLabelPrefixes and allowedCustomMetricsFullLabelNames specify all metric labels allowed for querying allowedCustomMetricsLabelPrefixes = []string{"metric.labels"} allowedCustomMetricsFullLabelNames = []string{"reducer"} allowedReducers = map[string]bool{ "REDUCE_NONE": true, "REDUCE_MEAN": true, "REDUCE_MIN": true, "REDUCE_MAX": true, "REDUCE_SUM": true, "REDUCE_STDDEV": true, "REDUCE_COUNT": true, "REDUCE_COUNT_TRUE": true, "REDUCE_COUNT_FALSE": true, "REDUCE_FRACTION_TRUE": true, "REDUCE_PERCENTILE_99": true, "REDUCE_PERCENTILE_95": true, "REDUCE_PERCENTILE_50": true, "REDUCE_PERCENTILE_05": true, } ) const ( // AllNamespaces is constant to indicate that there is no namespace filter in query AllNamespaces = "" // MaxNumOfArgsInOneOfFilter is the maximum value of one_of() function allowed in Stackdriver Filters MaxNumOfArgsInOneOfFilter = 100 // PrometheusMetricPrefix is the prefix for prometheus metrics PrometheusMetricPrefix = "prometheus.googleapis.com" ) type clock interface { Now() time.Time } type realClock struct{} func (c realClock) Now() time.Time { return time.Now() } // Translator is a structure used to translate between Custom Metrics API and Stackdriver API type Translator struct { service *stackdriver.Service config *config.GceConfig reqWindow time.Duration alignmentPeriod time.Duration clock clock mapper apimeta.RESTMapper useNewResourceModel bool supportDistributions bool } // podValues is a helper struct to hold pods values type podValues struct { pods *v1.PodList podNames []string } // isPodValuesValid checks if podValues is valid to be used // // this only happens when the both pods & podNames are provided, // for example, when WithPods() and WithPodNames() are both used in QueryBuilder func (pc podValues) isPodValuesValid() bool { return !(len(pc.podNames) > 0 && pc.pods != nil && len(pc.pods.Items) > 0) } // isPodValuesEmpty checks if pods container is empty func (pc podValues) isPodValuesEmpty() bool { return len(pc.podNames) == 0 && (pc.pods == nil || len(pc.pods.Items) == 0) } // getQuotedPodNames gets quoted pod names from podValues if any provided func (pc podValues) getQuotedPodNames() []string { if pc.pods == nil { return pc.podNames } podNames := make([]string, len(pc.pods.Items)) for i, item := range pc.pods.Items { podNames[i] = fmt.Sprintf("%q", item.GetName()) } return podNames } // getPodIDs gets pod ids from podValues if any provided func (pc podValues) getPodIDs() []string { if pc.pods == nil { return []string{} } podIDs := make([]string, len(pc.pods.Items)) for i, item := range pc.pods.Items { podIDs[i] = fmt.Sprintf("%q", item.GetUID()) } return podIDs } // nodeValues is a helper struct to hold nodes values type nodeValues struct { nodes *v1.NodeList nodeNames []string } // isNodeValuesValid checks if nodeContainer is valid to be used // // this only happens when the both nodes & nodeNames are provided, // for example, when WithNodes() and WithNodeNames() are both used in QueryBuilder func (nc nodeValues) isNodeValuesValid() bool { return !(len(nc.nodeNames) > 0 && nc.nodes != nil && len(nc.nodes.Items) > 0) } // isNodeValuesEmpty checks if nodes container is empty func (nc nodeValues) isNodeValuesEmpty() bool { return len(nc.nodeNames) == 0 && (nc.nodes == nil || len(nc.nodes.Items) == 0) } // getNodeNames gets node names from nodeContainer if any provided func (nc nodeValues) getNodeNames() []string { if nc.nodes != nil { nodeNames := make([]string, len(nc.nodes.Items)) for i, item := range nc.nodes.Items { nodeNames[i] = item.GetName() } return nodeNames } return nc.nodeNames } // QueryBuilder is a builder for ProjectsTimeSeriesListCall // // use NewQueryBuilder() to initialize type QueryBuilder struct { translator *Translator // translator provides configurations to filter metricName string // metricName is the metric name to filter metricKind string // metricKind is the metric kind to filter metricValueType string // metricValueType is the metric value type to filter metricSelector labels.Selector // metricSelector is the metric selector to filtere namespace string // namespace is the namespace to filter (mutually exclusive with nodes) pods podValues // pods is the pods to filter (mutually exclusive with nodes) nodes nodeValues // nodes is the nodes to filter (mutually exclusive with namespace, pods) enforceContainerType bool // enforceContainerType decides whether to enforce using container type filter schema } // NewQueryBuilder is the initiator for QueryBuilder // // Parameters: // - translator, required for configurations. // - metricName, required to determine the query schema. // // Example: // // queryBuilder := NewQueryBuilder(NewTranslator(...), "custom.googleapis.com/foo") func NewQueryBuilder(translator *Translator, metricName string) QueryBuilder { return QueryBuilder{ translator: translator, metricName: metricName, } } // WithMetricKind adds a metric kind filter to the QueryBuilder // // Example: // // queryBuilder := NewQueryBuilder(translator, metricName).WithMetricKind("GAUGE") func (qb QueryBuilder) WithMetricKind(metricKind string) QueryBuilder { qb.metricKind = metricKind return qb } // WithMetricValueType adds a metric value type filter to the QueryBuilder // // Example: // // queryBuilder := NewQueryBuilder(translator, metricName).WithMetricValueType("INT64") func (qb QueryBuilder) WithMetricValueType(metricValueType string) QueryBuilder { qb.metricValueType = metricValueType return qb } // WithMetricSelector adds a metric selector filter to the QueryBuilder // // Example: // // // labels comes from "k8s.io/apimachinery/pkg/labels" // metricSelector, _ := labels.Parse("metric.labels.custom=test") // queryBuilder := NewQueryBuilder(translator, metricName).WithMetricSelector(metricSelector) func (qb QueryBuilder) WithMetricSelector(metricSelector labels.Selector) QueryBuilder { qb.metricSelector = metricSelector return qb } // WithNamespace adds a namespace filter to to the QueryBuilder // // - CANNOT be used with WithNodes // // Example: // // queryBuilder := NewQueryBuilder(translator, metricName).WithNamespace("gmp-test") func (qb QueryBuilder) WithNamespace(namespace string) QueryBuilder { qb.namespace = namespace return qb } // WithPods adds a pod filter to the QueryBuilder (used when namespace is NOT empty) // // # ONLY one among WithPods, WithPodNames and WithNodes should be used // // Example: // // // v1 comes from "k8s.io/api/core/v1" // pod := v1.Pod{ // ObjectMeta: metav1.ObjectMeta{ // ClusterName: "my-cluster", // UID: "my-pod-id", // Name: "my-pod-name", // }, // } // queryBuilder := NewQueryBuilder(translator, metricName).WithPods(&v1.PodList{Items: []v1.Pod{pod}}) func (qb QueryBuilder) WithPods(pods *v1.PodList) QueryBuilder { qb.pods.pods = pods return qb } // WithPodNames adds a pod filter to the QueryBuilder (used when namespace is NOT empty) // // # ONLY one among WithPods, WithPodNames and WithNodes should be used // // Example: // // podNames := []string{"pod-1", "pod-2"} // queryBuilder := NewQueryBuilder(translator, metricName).WithPodNames(podNames) func (qb QueryBuilder) WithPodNames(podNames []string) QueryBuilder { qb.pods.podNames = podNames return qb } // WithNodes adds a node filter to the QueryBuilder (used when namespace is empty) // // - ONLY one among WithPods, WithPodNames and WithNodes should be used // - CANNOT be used with WithNamespace // // Exmaple: // // // v1 comes from "k8s.io/api/core/v1" // node := v1.Node{ // ObjectMeta: metav1.ObjectMeta{ // ClusterName: "my-cluster", // UID: "my-node-id-1", // Name: "my-node-name-1", // }, // } // queryBuilder := NewQueryBuilder(translator, metricName).WithNodes(&v1.NodeList{Items: []v1.Node{node}}) func (qb QueryBuilder) WithNodes(nodes *v1.NodeList) QueryBuilder { qb.nodes.nodes = nodes return qb } func (qb QueryBuilder) WithNodeNames(nodeNames []string) QueryBuilder { qb.nodes.nodeNames = nodeNames return qb } // getResourceNames is an internal helper function to convert pods or nodes to podNames or nodeNames func (qb QueryBuilder) getResourceNames() []string { if qb.translator.useNewResourceModel { // new resource model if !qb.pods.isPodValuesEmpty() { // pods return qb.pods.getQuotedPodNames() } // nodes return qb.nodes.getNodeNames() } // legacy resource model return qb.pods.getPodIDs() } // AsContainerType enforces to query k8s_container type metrics // // it it valid only when useNewResourceModel is true func (qb QueryBuilder) AsContainerType() QueryBuilder { qb.enforceContainerType = true return qb } // validate is an internal helper function for checking prerequisits before Build // // Criteria: // - translator has to be provided // - only one of nodes or namespace should be set. // - only one from podNames, pods, nodes should be set // - podList is required to be no longer than MaxNumOfArgsInOneOfFilter items. This is enforced by limitation of // "one_of()" operator in Stackdriver filters, see documentation: "https://cloud.google.com/monitoring/api/v3/filters" // - metric value type cannot be "DISTRIBUTION" while translator does not support distribution // - container type filter schema cannot be used on the legacy resource model func (qb QueryBuilder) validate() error { if qb.translator == nil { return apierr.NewInternalError(fmt.Errorf("QueryBuilder tries to build with translator value: nil")) } if !qb.nodes.isNodeValuesEmpty() { // node metric if !qb.nodes.isNodeValuesValid() { return apierr.NewInternalError(fmt.Errorf("invalid nodes parameter is set to QueryBuilder")) } if qb.namespace != "" { return apierr.NewInternalError(fmt.Errorf("both nodes and namespace are provided, expect only one of them.")) } if !qb.pods.isPodValuesEmpty() { return apierr.NewInternalError(fmt.Errorf("both nodes and pods are provided, expect only one of them.")) } } else { // pod metric if qb.pods.isPodValuesEmpty() { return apierr.NewInternalError(fmt.Errorf("no resources are specified for QueryBuilder, expected one of nodes or pods should be used")) } if !qb.pods.isPodValuesValid() { return apierr.NewInternalError(fmt.Errorf("invalid pods parameter is set to QueryBuilder")) } numPods := len(qb.pods.getQuotedPodNames()) if numPods > MaxNumOfArgsInOneOfFilter { return apierr.NewInternalError(fmt.Errorf("QueryBuilder tries to build with %v pod list, but allowed limit is %v pods", numPods, MaxNumOfArgsInOneOfFilter)) } } if qb.metricValueType == "DISTRIBUTION" && !qb.translator.supportDistributions { return apierr.NewBadRequest("distributions are not supported") } if qb.enforceContainerType && !qb.translator.useNewResourceModel { return apierr.NewInternalError(fmt.Errorf("illegal state! Container metrics works only with new resource model")) } return nil } // getFilterBuilder is an internal helper function to decide which type of FilterBuilder to use // // Prorities: // 1. if useNewResourceModel is false, then use legacy type FilterBuiler // 2. if enforceContainerType is true, then use container type FilterBuilder // 3. if metricName is prefixed with PrometheusMetricPrefix, then use prometheus type FilterBuilder // 4. if namespace is empty, then use node type FilterBuilder // 5. By default, use pod type FilterBuilder func (qb QueryBuilder) getFilterBuilder() utils.FilterBuilder { // legacy type FilterBuilder if !qb.translator.useNewResourceModel { return utils.NewFilterBuilder(utils.SchemaTypes[utils.LegacySchemaKey]) } // container type FilterBuilder if qb.enforceContainerType { return utils.NewFilterBuilder(utils.SchemaTypes[utils.ContainerSchemaKey]) } // prometheus type FilterBuilder if strings.HasPrefix(qb.metricName, PrometheusMetricPrefix) { return utils.NewFilterBuilder(utils.SchemaTypes[utils.PrometheusSchemaKey]) } // node type FilterBuilder if qb.namespace == "" { return utils.NewFilterBuilder(utils.SchemaTypes[utils.NodeSchemaKey]) } // pod type FilterBuilder return utils.NewFilterBuilder(utils.SchemaTypes[utils.PodSchemaKey]) } // composeFilter is an internal helper function to compose filter criteria // when namespace is NOT empty func (qb QueryBuilder) composeFilter() string { filterBuilder := qb.getFilterBuilder(). WithMetricType(qb.metricName). WithProject(qb.translator.config.Project). WithCluster(qb.translator.config.Cluster) resourceNames := qb.getResourceNames() if qb.translator.useNewResourceModel { // new resource model specific filters filterBuilder = filterBuilder.WithLocation(qb.translator.config.Location) if !qb.nodes.isNodeValuesEmpty() { // node metrics return filterBuilder.WithNodes(resourceNames).Build() } // pod metrics return filterBuilder. WithNamespace(qb.namespace). WithPods(resourceNames). Build() } // legacy resource model specific filters return filterBuilder. WithContainer(). WithPods(resourceNames). Build() } // Build is the last step for QueryBuilder which converts itself into a ProjectsTimeSeriesListCall object // - has to pass the prerequisits specified in QueryBuilder.validate() // - uses the query schema based on useNewResourceModel from translator as well as the metric name. // - composes provided filters using the internal tool FilterBuilder // // Example: // // projectsTimeSeriesListCall, error = NewQueryBuilder(translator, metricName).Build() func (qb QueryBuilder) Build() (*stackdriver.ProjectsTimeSeriesListCall, error) { if err := qb.validate(); err != nil { return nil, err } filter := qb.composeFilter() if qb.metricSelector.Empty() { return qb.translator.createListTimeseriesRequest(filter, qb.metricKind, qb.metricValueType, ""), nil } filterForSelector, reducer, err := qb.translator.filterForSelector(qb.metricSelector, allowedCustomMetricsLabelPrefixes, allowedCustomMetricsFullLabelNames) if err != nil { return nil, err } return qb.translator.createListTimeseriesRequest(joinFilters(filterForSelector, filter), qb.metricKind, qb.metricValueType, reducer), nil } // NewTranslator creates a Translator func NewTranslator(service *stackdriver.Service, gceConf *config.GceConfig, rateInterval time.Duration, alignmentPeriod time.Duration, mapper apimeta.RESTMapper, useNewResourceModel, supportDistributions bool) *Translator { return &Translator{ service: service, config: gceConf, reqWindow: rateInterval, alignmentPeriod: alignmentPeriod, clock: realClock{}, mapper: mapper, useNewResourceModel: useNewResourceModel, supportDistributions: supportDistributions, } } // GetExternalMetricRequest returns Stackdriver request for query for external metric. func (t *Translator) GetExternalMetricRequest(metricName, metricKind, metricValueType string, metricSelector labels.Selector) (*stackdriver.ProjectsTimeSeriesListCall, error) { if metricValueType == "DISTRIBUTION" && !t.supportDistributions { return nil, apierr.NewBadRequest("Distributions are not supported") } metricProject, err := t.GetExternalMetricProject(metricSelector) if err != nil { return nil, err } filterForMetric := t.filterForMetric(metricName) if metricSelector.Empty() { return t.createListTimeseriesRequest(filterForMetric, metricKind, metricValueType, ""), nil } filterForSelector, reducer, err := t.filterForSelector(metricSelector, allowedExternalMetricsLabelPrefixes, allowedExternalMetricsFullLabelNames) if err != nil { return nil, err } return t.createListTimeseriesRequestProject(joinFilters(filterForMetric, filterForSelector), metricKind, metricProject, metricValueType, reducer), nil } // ListMetricDescriptors returns Stackdriver request for all custom metrics descriptors. func (t *Translator) ListMetricDescriptors(fallbackForContainerMetrics bool) *stackdriver.ProjectsMetricDescriptorsListCall { var filter string if t.useNewResourceModel { filter = joinFilters(t.filterForCluster(), t.filterForAnyResource(fallbackForContainerMetrics)) } else { filter = joinFilters(t.legacyFilterForCluster(), t.legacyFilterForAnyPod()) } return t.service.Projects.MetricDescriptors. List(fmt.Sprintf("projects/%s", t.config.Project)). Filter(filter) } // GetMetricKind returns metricKind for metric metricName, obtained from Stackdriver Monitoring API. func (t *Translator) GetMetricKind(metricName string, metricSelector labels.Selector) (string, string, error) { metricProj := t.config.Project requirements, selectable := metricSelector.Requirements() if !selectable { return "", "", apierr.NewBadRequest(fmt.Sprintf("Label selector is impossible to match: %s", metricSelector)) } for _, req := range requirements { if req.Key() == "resource.labels.project_id" { if req.Operator() == selection.Equals || req.Operator() == selection.DoubleEquals { metricProj = req.Values().List()[0] break } return "", "", NewLabelNotAllowedError(fmt.Sprintf("Project selector must use '=' or '==': You used %s", req.Operator())) } } response, err := t.service.Projects.MetricDescriptors.Get(fmt.Sprintf("projects/%s/metricDescriptors/%s", metricProj, metricName)).Do() if err != nil { return "", "", NewNoSuchMetricError(metricName, err) } return response.MetricKind, response.ValueType, nil } // GetExternalMetricProject If the metric has "resource.labels.project_id" as a selector, then use a different project func (t *Translator) GetExternalMetricProject(metricSelector labels.Selector) (string, error) { requirements, _ := metricSelector.Requirements() for _, req := range requirements { if req.Key() == "resource.labels.project_id" { if req.Operator() == selection.Equals || req.Operator() == selection.DoubleEquals { return req.Values().List()[0], nil } return "", NewLabelNotAllowedError(fmt.Sprintf("Project selector must use '=' or '==': You used %s", req.Operator())) } } return t.config.Project, nil } func isAllowedLabelName(labelName string, allowedLabelPrefixes []string, allowedFullLabelNames []string) bool { for _, prefix := range allowedLabelPrefixes { if strings.HasPrefix(labelName, prefix+".") { return true } } for _, name := range allowedFullLabelNames { if labelName == name { return true } } return false } func splitMetricLabel(labelName string, allowedLabelPrefixes []string) (string, string, error) { for _, prefix := range allowedLabelPrefixes { if strings.HasPrefix(labelName, prefix+".") { return prefix, strings.TrimPrefix(labelName, prefix+"."), nil } } return "", "", apierr.NewBadRequest(fmt.Sprintf("Label name: %s is not allowed.", labelName)) } func quoteAll(list []string) []string { result := []string{} for _, item := range list { result = append(result, fmt.Sprintf("%q", item)) } return result } // Deprecated, use FilterBuilder instead func joinFilters(filters ...string) string { nonEmpty := []string{} for _, f := range filters { if f != "" { nonEmpty = append(nonEmpty, f) } } return strings.Join(nonEmpty, " AND ") } // Deprecated, use FilterBuilder instead func (t *Translator) filterForCluster() string { projectFilter := fmt.Sprintf("resource.labels.project_id = %q", t.config.Project) clusterFilter := fmt.Sprintf("resource.labels.cluster_name = %q", t.config.Cluster) locationFilter := fmt.Sprintf("resource.labels.location = %q", t.config.Location) return fmt.Sprintf("%s AND %s AND %s", projectFilter, clusterFilter, locationFilter) } // Deprecated, use FilterBuilder instead func (t *Translator) filterForMetric(metricName string) string { return fmt.Sprintf("metric.type = %q", metricName) } // Deprecated, use FilterBuilder instead func (t *Translator) filterForAnyPod() string { return "resource.type = \"k8s_pod\"" } // Deprecated, use FilterBuilder instead func (t *Translator) filterForAnyNode() string { return "resource.type = \"k8s_node\"" } // Deprecated, use FilterBuilder instead func (t *Translator) filterForAnyContainer() string { return "resource.type = \"k8s_container\"" } // Deprecated, use FilterBuilder instead func (t *Translator) filterForAnyResource(fallbackForContainerMetrics bool) string { if fallbackForContainerMetrics { return "resource.type = one_of(\"k8s_pod\",\"k8s_node\",\"k8s_container\")" } return "resource.type = one_of(\"k8s_pod\",\"k8s_node\")" } // Deprecated, use FilterBuilder instead // The namespace string can be empty. If so, all namespaces are allowed. func (t *Translator) filterForPods(podNames []string, namespace string) string { if len(podNames) == 0 { klog.Fatalf("createFilterForPods called with empty list of pod names") } else if len(podNames) == 1 { if namespace == AllNamespaces { return fmt.Sprintf("resource.labels.pod_name = %s", podNames[0]) } return fmt.Sprintf("resource.labels.namespace_name = %q AND resource.labels.pod_name = %s", namespace, podNames[0]) } if namespace == AllNamespaces { return fmt.Sprintf("resource.labels.pod_name = one_of(%s)", strings.Join(podNames, ",")) } return fmt.Sprintf("resource.labels.namespace_name = %q AND resource.labels.pod_name = one_of(%s)", namespace, strings.Join(podNames, ",")) } // Deprecated, use FilterBuilder instead func (t *Translator) filterForNodes(nodeNames []string) string { if len(nodeNames) == 0 { klog.Fatalf("createFilterForNodes called with empty list of node names") } else if len(nodeNames) == 1 { return fmt.Sprintf("resource.labels.node_name = %s", nodeNames[0]) } return fmt.Sprintf("resource.labels.node_name = one_of(%s)", strings.Join(nodeNames, ",")) } // Deprecated, use FilterBuilder instead func (t *Translator) legacyFilterForCluster() string { projectFilter := fmt.Sprintf("resource.labels.project_id = %q", t.config.Project) // Skip location, since it may be set incorrectly by Heapster for old resource model clusterFilter := fmt.Sprintf("resource.labels.cluster_name = %q", t.config.Cluster) containerFilter := "resource.labels.container_name = \"\"" return fmt.Sprintf("%s AND %s AND %s", projectFilter, clusterFilter, containerFilter) } // Deprecated, use FilterBuilder instead func (t *Translator) legacyFilterForAnyPod() string { return "resource.labels.pod_id != \"\" AND resource.labels.pod_id != \"machine\"" } // Deprecated, use FilterBuilder instead func (t *Translator) legacyFilterForPods(podIDs []string) string { if len(podIDs) == 0 { klog.Fatalf("createFilterForIDs called with empty list of pod IDs") } else if len(podIDs) == 1 { return fmt.Sprintf("resource.labels.pod_id = %s", podIDs[0]) } return fmt.Sprintf("resource.labels.pod_id = one_of(%s)", strings.Join(podIDs, ",")) } func (t *Translator) filterForSelector(metricSelector labels.Selector, allowedLabelPrefixes []string, allowedFullLabelNames []string) (string, string, error) { requirements, selectable := metricSelector.Requirements() if !selectable { return "", "", apierr.NewBadRequest(fmt.Sprintf("Label selector is impossible to match: %s", metricSelector)) } filters := []string{} var reducer string for _, req := range requirements { if req.Key() == "reducer" { if req.Operator() != selection.Equals && req.Operator() != selection.DoubleEquals { return "", "", NewLabelNotAllowedError(fmt.Sprintf("Reducer must use '=' or '==': You used %s", req.Operator())) } if req.Values().Len() != 1 { return "", "", NewLabelNotAllowedError("Reducer must select a single value") } r, found := req.Values().PopAny() if !found { return "", "", NewLabelNotAllowedError("Reducer must specify a value") } if !allowedReducers[r] { return "", "", NewLabelNotAllowedError("Specified reducer is not supported: " + r) } reducer = r continue } l := req.Values().List() switch req.Operator() { case selection.Equals, selection.DoubleEquals: if isAllowedLabelName(req.Key(), allowedLabelPrefixes, allowedFullLabelNames) { filters = append(filters, fmt.Sprintf("%s = %q", req.Key(), l[0])) } else { return "", "", NewLabelNotAllowedError(req.Key()) } case selection.NotEquals: if isAllowedLabelName(req.Key(), allowedLabelPrefixes, allowedFullLabelNames) { filters = append(filters, fmt.Sprintf("%s != %q", req.Key(), l[0])) } else { return "", "", NewLabelNotAllowedError(req.Key()) } case selection.In: if isAllowedLabelName(req.Key(), allowedLabelPrefixes, allowedFullLabelNames) { if len(l) == 1 { filters = append(filters, fmt.Sprintf("%s = %s", req.Key(), l[0])) } else { filters = append(filters, fmt.Sprintf("%s = one_of(%s)", req.Key(), strings.Join(quoteAll(l), ","))) } } else { return "", "", NewLabelNotAllowedError(req.Key()) } case selection.NotIn: if isAllowedLabelName(req.Key(), allowedLabelPrefixes, allowedFullLabelNames) { if len(l) == 1 { filters = append(filters, fmt.Sprintf("%s != %s", req.Key(), l[0])) } else { filters = append(filters, fmt.Sprintf("NOT %s = one_of(%s)", req.Key(), strings.Join(quoteAll(l), ","))) } } else { return "", "", NewLabelNotAllowedError(req.Key()) } case selection.Exists: prefix, suffix, err := splitMetricLabel(req.Key(), allowedLabelPrefixes) if err == nil { filters = append(filters, fmt.Sprintf("%s : %s", prefix, suffix)) } else { return "", "", NewLabelNotAllowedError(req.Key()) } case selection.DoesNotExist: // DoesNotExist is not allowed due to Stackdriver filtering syntax limitation return "", "", apierr.NewBadRequest("Label selector with operator DoesNotExist is not allowed") case selection.GreaterThan: if isAllowedLabelName(req.Key(), allowedLabelPrefixes, allowedFullLabelNames) { value, err := strconv.ParseInt(l[0], 10, 64) if err != nil { return "", "", apierr.NewInternalError(fmt.Errorf("Unexpected error: value %s could not be parsed to integer", l[0])) } filters = append(filters, fmt.Sprintf("%s > %v", req.Key(), value)) } else { return "", "", NewLabelNotAllowedError(req.Key()) } case selection.LessThan: if isAllowedLabelName(req.Key(), allowedLabelPrefixes, allowedFullLabelNames) { value, err := strconv.ParseInt(l[0], 10, 64) if err != nil { return "", "", apierr.NewInternalError(fmt.Errorf("Unexpected error: value %s could not be parsed to integer", l[0])) } filters = append(filters, fmt.Sprintf("%s < %v", req.Key(), value)) } else { return "", "", NewLabelNotAllowedError(req.Key()) } default: return "", "", NewOperationNotSupportedError(fmt.Sprintf("Selector with operator %q", req.Operator())) } } return strings.Join(filters, " AND "), reducer, nil } func (t *Translator) getMetricLabels(series *stackdriver.TimeSeries) map[string]string { metricLabels := map[string]string{} for label, value := range series.Metric.Labels { metricLabels["metric.labels."+label] = value } metricLabels["resource.type"] = series.Resource.Type for label, value := range series.Resource.Labels { metricLabels["resource.labels."+label] = value } return metricLabels } func (t *Translator) createListTimeseriesRequest(filter, metricKind, metricValueType, reducer string) *stackdriver.ProjectsTimeSeriesListCall { return t.createListTimeseriesRequestProject(filter, metricKind, t.config.Project, metricValueType, reducer) } func (t *Translator) createListTimeseriesRequestProject(filter, metricKind, metricProject, metricValueType, reducer string) *stackdriver.ProjectsTimeSeriesListCall { project := fmt.Sprintf("projects/%s", metricProject) endTime := t.clock.Now() startTime := endTime.Add(-t.reqWindow) // use "ALIGN_NEXT_OLDER" by default, i.e. for metricKind "GAUGE" aligner := "ALIGN_NEXT_OLDER" alignmentPeriod := t.reqWindow if metricKind == "DELTA" || metricKind == "CUMULATIVE" { aligner = "ALIGN_RATE" // Calculates integral of metric on segment and divide it by segment length. alignmentPeriod = t.alignmentPeriod } if metricValueType == "DISTRIBUTION" { aligner = "ALIGN_DELTA" } ptslc := t.service.Projects.TimeSeries.List(project).Filter(filter). IntervalStartTime(startTime.Format(time.RFC3339)). IntervalEndTime(endTime.Format(time.RFC3339)). AggregationPerSeriesAligner(aligner). AggregationAlignmentPeriod(fmt.Sprintf("%vs", int64(alignmentPeriod.Seconds()))) if reducer != "" { ptslc = ptslc.AggregationCrossSeriesReducer(reducer) } return ptslc } // GetPodItems returns list Pod Objects func (t *Translator) GetPodItems(list *v1.PodList) []metav1.ObjectMeta { items := []metav1.ObjectMeta{} for _, item := range list.Items { items = append(items, item.ObjectMeta) } return items } // GetNodeItems returns list Node Objects func (t *Translator) GetNodeItems(list *v1.NodeList) []metav1.ObjectMeta { items := []metav1.ObjectMeta{} for _, item := range list.Items { items = append(items, item.ObjectMeta) } return items } func isDistribution(metricSelector labels.Selector) bool { requirements, _ := metricSelector.Requirements() for _, req := range requirements { if req.Key() == "reducer" { return true } } return false }