custom-metrics-stackdriver-adapter/pkg/adapter/coreprovider/client.go (117 lines of code) (raw):

package coreprovider import ( translator "github.com/GoogleCloudPlatform/k8s-stackdriver/custom-metrics-stackdriver-adapter/pkg/adapter/translator" stackdriver "google.golang.org/api/monitoring/v3" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/klog" "sigs.k8s.io/metrics-server/pkg/api" ) const ( metricKindCPU = "CUMULATIVE" metricKindRAM = "GAUGE" metricValueTypeCPU = "DOUBLE" metricValueTypeRAM = "INT64" containerCPUMetricName = "kubernetes.io/container/cpu/core_usage_time" containerRAMMetricName = "kubernetes.io/container/memory/used_bytes" nodeCPUMetricName = "kubernetes.io/node/cpu/core_usage_time" nodeRAMMetricName = "kubernetes.io/node/memory/used_bytes" ) // For testing proposes type doRequestFunction interface { do(*stackdriver.ProjectsTimeSeriesListCall) (*stackdriver.ListTimeSeriesResponse, error) } type regularDo struct{} func (d *regularDo) do(stackdriverRequest *stackdriver.ProjectsTimeSeriesListCall) (*stackdriver.ListTimeSeriesResponse, error) { return stackdriverRequest.Do() } type stackdriverCoreClient struct { translator *translator.Translator doRequest doRequestFunction } func newClient(translator *translator.Translator) *stackdriverCoreClient { return &stackdriverCoreClient{ translator: translator, doRequest: &regularDo{}, } } func (p *stackdriverCoreClient) getPodMetric(podsNames []string, metricName, metricKind, metricValueType string, labels labels.Selector) (map[string]map[string]resource.Quantity, map[string]api.TimeInfo, error) { numOfRequests := (len(podsNames) + translator.MaxNumOfArgsInOneOfFilter - 1) / translator.MaxNumOfArgsInOneOfFilter // ceil r := translator.NewPodResult(p.translator) for i := 0; i < numOfRequests; i++ { segmentBeg := i * translator.MaxNumOfArgsInOneOfFilter segmentEnd := min((i+1)*translator.MaxNumOfArgsInOneOfFilter, len(podsNames)) stackdriverRequest, err := translator.NewQueryBuilder(p.translator, metricName). AsContainerType(). WithPodNames(podsNames[segmentBeg:segmentEnd]). WithMetricKind(metricKind). WithMetricValueType(metricValueType). WithMetricSelector(labels). WithNamespace(translator.AllNamespaces). Build() if err != nil { return nil, nil, err } response, err := p.doRequest.do(stackdriverRequest) // TODO: make this calls parallel if err != nil { return nil, nil, err } err = r.AddCoreContainerMetricFromResponse(response) if err != nil { return nil, nil, err } } return r.ContainerMetric, r.TimeInfo, nil } func (p *stackdriverCoreClient) getContainerCPU(podsNames []string) (map[string]map[string]resource.Quantity, map[string]api.TimeInfo, error) { return p.getPodMetric(podsNames, containerCPUMetricName, metricKindCPU, metricValueTypeCPU, labels.Everything()) } func (p *stackdriverCoreClient) getContainerRAM(podsNames []string) (map[string]map[string]resource.Quantity, map[string]api.TimeInfo, error) { return p.getPodMetric(podsNames, containerRAMMetricName, metricKindRAM, metricValueTypeRAM, ramNonEvictableLabel()) } func (p *stackdriverCoreClient) getNodeMetric(nodeNames []string, metricName, metricKind, metricValueType string, labels labels.Selector) (map[string]resource.Quantity, map[string]api.TimeInfo, error) { numOfRequests := (len(nodeNames) + translator.MaxNumOfArgsInOneOfFilter - 1) / translator.MaxNumOfArgsInOneOfFilter // ceil r := translator.NewNodeResult(p.translator) for i := 0; i < numOfRequests; i++ { segmentBeg := i * translator.MaxNumOfArgsInOneOfFilter segmentEnd := min((i+1)*translator.MaxNumOfArgsInOneOfFilter, len(nodeNames)) stackdriverRequest, err := translator.NewQueryBuilder(p.translator, metricName). WithNodeNames(nodeNames[segmentBeg:segmentEnd]). WithMetricKind(metricKind). WithMetricValueType(metricValueType). WithMetricSelector(labels). Build() if err != nil { return nil, nil, err } response, err := p.doRequest.do(stackdriverRequest) // TODO: make this calls parallel if err != nil { return nil, nil, err } err = r.AddCoreNodeMetricFromResponse(response) if err != nil { return nil, nil, err } } return r.NodeMetric, r.TimeInfo, nil } func (p *stackdriverCoreClient) getNodeCPU(nodesNames []string) (map[string]resource.Quantity, map[string]api.TimeInfo, error) { return p.getNodeMetric(nodesNames, nodeCPUMetricName, metricKindCPU, metricValueTypeCPU, labels.Everything()) } func (p *stackdriverCoreClient) getNodeRAM(nodesNames []string) (map[string]resource.Quantity, map[string]api.TimeInfo, error) { return p.getNodeMetric(nodesNames, nodeRAMMetricName, metricKindRAM, metricValueTypeRAM, ramNonEvictableLabel()) } func ramNonEvictableLabel() labels.Selector { ramMetricLabels := labels.Everything() req, err := labels.NewRequirement("metric.labels.memory_type", selection.Equals, []string{"non-evictable"}) if err != nil { klog.Fatalf("Internal error. Requirement build failed. This shouldn't happen.") } return ramMetricLabels.Add(*req) } func min(a, b int) int { if a < b { return a } return b }