pkg/operator/endpoint_status_builder.go (239 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operator
import (
"fmt"
"sort"
"strconv"
"strings"
monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
// How many targets to keep in each group.
maxSampleTargetSize = 5
)
func buildEndpointStatuses(targets []*prometheusv1.TargetsResult) (map[string][]monitoringv1.ScrapeEndpointStatus, error) {
endpointBuilder := &scrapeEndpointBuilder{
mapByKeyByEndpoint: make(map[string]map[string]*scrapeEndpointStatusBuilder),
total: 0,
failed: 0,
time: metav1.Now(),
}
for _, target := range targets {
if err := endpointBuilder.add(target); err != nil {
return nil, err
}
}
return endpointBuilder.build(), nil
}
type scrapeEndpointBuilder struct {
mapByKeyByEndpoint map[string]map[string]*scrapeEndpointStatusBuilder
total uint32
failed uint32
time metav1.Time
}
func (b *scrapeEndpointBuilder) add(target *prometheusv1.TargetsResult) error {
b.total++
if target != nil {
for _, activeTarget := range target.Active {
if err := b.addActiveTarget(activeTarget, b.time); err != nil {
return err
}
}
} else {
b.failed++
}
return nil
}
func setNamespacedObjectByScrapeJobKey(o monitoringv1.PodMonitoringCRD, split []string, full string) (monitoringv1.PodMonitoringCRD, error) {
if len(split) != 3 {
return nil, fmt.Errorf("invalid %s scrape key format %q", split[0], full)
}
o.SetNamespace(split[1])
o.SetName(split[2])
return o, nil
}
func setClusterScopedObjectByScrapeJobKey(o monitoringv1.PodMonitoringCRD, split []string, full string) (monitoringv1.PodMonitoringCRD, error) {
if len(split) != 2 {
return nil, fmt.Errorf("invalid %s scrape key format %q", split[0], full)
}
o.SetName(split[1])
return o, nil
}
// getObjectByScrapeJobKey converts the key to a CRD. See monitoringv1.PodMonitoringCRD.GetKey().
func getObjectByScrapeJobKey(key string) (monitoringv1.PodMonitoringCRD, error) {
split := strings.Split(key, "/")
// Generally:
// - "kind" for scrape pools without a respective CRD.
// - "kind/name" for cluster-scoped resources.
// - "kind/namespace/name" for namespaced resources.
switch split[0] {
case "kubelet":
if len(split) != 1 {
return nil, fmt.Errorf("invalid kubelet scrape key format %q", key)
}
return nil, nil
case "PodMonitoring":
return setNamespacedObjectByScrapeJobKey(&monitoringv1.PodMonitoring{}, split, key)
case "ClusterPodMonitoring":
return setClusterScopedObjectByScrapeJobKey(&monitoringv1.ClusterPodMonitoring{}, split, key)
case "ClusterNodeMonitoring":
if _, err := setClusterScopedObjectByScrapeJobKey(&monitoringv1.ClusterPodMonitoring{}, split, key); err != nil {
return nil, err
}
return nil, nil
default:
return nil, fmt.Errorf("unknown scrape kind %q", split[0])
}
}
// scrapePool is the parsed Prometheus scrape pool, which is assigned to the job name from our
// configurations. For example, for PodMonitoring this is `PodMonitoring/namespace/name/port`. The
// key is what identifies the resource (`PodMonitoring/namespace/name`) and the group indicates a
// small subset of that resource (`port`).
type scrapePool struct {
key string
group string
}
func getNamespacedScrapePool(full string, split []string) scrapePool {
// Same as: len(strings.Join(split, "/")) for "kind/namespace/name"
index := len(split[0]) + 1 + len(split[1]) + 1 + len(split[2])
return scrapePool{
key: full[:index],
group: full[index:],
}
}
func getClusterScopedScrapePool(full string, split []string) scrapePool {
// Same as: len(strings.Join(split, "/")) for "kind/namespace"
index := len(split[0]) + 1 + len(split[1])
return scrapePool{
key: full[:index],
group: full[index:],
}
}
func parseScrapePool(pool string) (scrapePool, error) {
split := strings.Split(pool, "/")
switch split[0] {
case "kubelet":
if len(split) != 2 {
return scrapePool{}, fmt.Errorf("invalid kubelet scrape pool format %q", pool)
}
return scrapePool{
key: split[0],
group: split[1],
}, nil
case "PodMonitoring":
if len(split) != 4 {
return scrapePool{}, fmt.Errorf("invalid PodMonitoring scrape pool format %q", pool)
}
return getNamespacedScrapePool(pool, split), nil
case "ClusterPodMonitoring":
if len(split) != 3 {
return scrapePool{}, fmt.Errorf("invalid ClusterPodMonitoring scrape pool format %q", pool)
}
return getClusterScopedScrapePool(pool, split), nil
case "ClusterNodeMonitoring":
if len(split) != 3 && len(split) != 4 {
return scrapePool{}, fmt.Errorf("invalid ClusterNodeMonitoring scrape pool format %q", pool)
}
return getClusterScopedScrapePool(pool, split), nil
default:
return scrapePool{}, fmt.Errorf("unknown scrape kind %q", split[0])
}
}
func (b *scrapeEndpointBuilder) addActiveTarget(activeTarget prometheusv1.ActiveTarget, time metav1.Time) error {
scrapePool, err := parseScrapePool(activeTarget.ScrapePool)
if err != nil {
return err
}
mapByEndpoint, ok := b.mapByKeyByEndpoint[scrapePool.key]
if !ok {
tmp := make(map[string]*scrapeEndpointStatusBuilder)
mapByEndpoint = tmp
b.mapByKeyByEndpoint[scrapePool.key] = mapByEndpoint
}
statusBuilder, exists := mapByEndpoint[scrapePool.group]
if !exists {
statusBuilder = newScrapeEndpointStatusBuilder(&activeTarget, time)
mapByEndpoint[scrapePool.group] = statusBuilder
}
statusBuilder.addSampleTarget(&activeTarget)
return nil
}
func (b *scrapeEndpointBuilder) build() map[string][]monitoringv1.ScrapeEndpointStatus {
fraction := float64(b.total-b.failed) / float64(b.total)
collectorsFraction := strconv.FormatFloat(fraction, 'f', -1, 64)
resultMap := make(map[string][]monitoringv1.ScrapeEndpointStatus)
for key, endpointMap := range b.mapByKeyByEndpoint {
endpointStatuses := make([]monitoringv1.ScrapeEndpointStatus, 0)
for _, statusBuilder := range endpointMap {
endpointStatus := statusBuilder.build()
endpointStatus.CollectorsFraction = collectorsFraction
endpointStatuses = append(endpointStatuses, endpointStatus)
}
// Make endpoint status deterministic.
sort.SliceStable(endpointStatuses, func(i, j int) bool {
lhsName := endpointStatuses[i].Name
rhsName := endpointStatuses[j].Name
return lhsName < rhsName
})
resultMap[key] = endpointStatuses
}
return resultMap
}
type scrapeEndpointStatusBuilder struct {
status monitoringv1.ScrapeEndpointStatus
groupByError map[string]*monitoringv1.SampleGroup
}
func newScrapeEndpointStatusBuilder(target *prometheusv1.ActiveTarget, time metav1.Time) *scrapeEndpointStatusBuilder {
return &scrapeEndpointStatusBuilder{
status: monitoringv1.ScrapeEndpointStatus{
Name: target.ScrapePool,
ActiveTargets: 0,
UnhealthyTargets: 0,
LastUpdateTime: time,
CollectorsFraction: "0",
},
groupByError: make(map[string]*monitoringv1.SampleGroup),
}
}
// Adds a sample target, potentially merging with a pre-existing one.
func (b *scrapeEndpointStatusBuilder) addSampleTarget(target *prometheusv1.ActiveTarget) {
b.status.ActiveTargets++
errorType := target.LastError
lastError := &errorType
if target.Health == "up" {
if len(target.LastError) == 0 {
lastError = nil
}
} else {
b.status.UnhealthyTargets++
}
sampleGroup, ok := b.groupByError[errorType]
sampleTarget := monitoringv1.SampleTarget{
Health: string(target.Health),
LastError: lastError,
Labels: target.Labels,
LastScrapeDurationSeconds: strconv.FormatFloat(target.LastScrapeDuration, 'f', -1, 64),
}
if !ok {
sampleGroup = &monitoringv1.SampleGroup{
SampleTargets: []monitoringv1.SampleTarget{},
Count: new(int32),
}
b.groupByError[errorType] = sampleGroup
}
*sampleGroup.Count++
sampleGroup.SampleTargets = append(sampleGroup.SampleTargets, sampleTarget)
}
// build a deterministic (regarding array ordering) status object.
func (b *scrapeEndpointStatusBuilder) build() monitoringv1.ScrapeEndpointStatus {
// Deterministic sample group by error.
for _, sampleGroup := range b.groupByError {
sort.SliceStable(sampleGroup.SampleTargets, func(i, j int) bool {
// Every sample target is guaranteed to have an instance label.
lhsInstance := sampleGroup.SampleTargets[i].Labels["instance"]
rhsInstance := sampleGroup.SampleTargets[j].Labels["instance"]
return lhsInstance < rhsInstance
})
sampleTargetsSize := len(sampleGroup.SampleTargets)
if sampleTargetsSize > maxSampleTargetSize {
sampleTargetsSize = maxSampleTargetSize
}
sampleGroup.SampleTargets = sampleGroup.SampleTargets[0:sampleTargetsSize]
b.status.SampleGroups = append(b.status.SampleGroups, *sampleGroup)
}
sort.SliceStable(b.status.SampleGroups, func(i, j int) bool {
// Assumes that every sample target in a group has the same error.
lhsError := b.status.SampleGroups[i].SampleTargets[0].LastError
rhsError := b.status.SampleGroups[j].SampleTargets[0].LastError
if lhsError == nil {
return false
} else if rhsError == nil {
return true
}
return *lhsError < *rhsError
})
return b.status
}