pkg/operator/target_status.go (329 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operator
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"
monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/api"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/client_golang/prometheus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var (
targetStatusDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "prometheus_engine_target_status_duration",
Help: "A metric indicating how long it took to fetch the complete target status.",
}, []string{})
// Minimum duration between polls.
minPollDuration = 10 * time.Second
)
// Responsible for fetching the targets given a pod.
type getTargetFn func(ctx context.Context, logger logr.Logger, httpClient *http.Client, port int32, pod *corev1.Pod) (*prometheusv1.TargetsResult, error)
// targetStatusReconciler to hold cached client state and source channel.
type targetStatusReconciler struct {
ch chan<- event.GenericEvent
opts Options
getTarget getTargetFn
clock clock.Clock
logger logr.Logger
httpClient *http.Client
kubeClient client.Client
}
// setupTargetStatusPoller sets up a reconciler that polls and populate target
// statuses whenever it receives an event.
func setupTargetStatusPoller(op *Operator, registry prometheus.Registerer, httpClient *http.Client) error {
if err := registry.Register(targetStatusDuration); err != nil {
return err
}
ch := make(chan event.GenericEvent, 1)
reconciler := &targetStatusReconciler{
ch: ch,
opts: op.opts,
getTarget: getTarget,
logger: op.logger,
httpClient: httpClient,
kubeClient: op.manager.GetClient(),
clock: clock.RealClock{},
}
err := ctrl.NewControllerManagedBy(op.manager).
Named("target-status").
// controller-runtime requires a For clause of the manager otherwise
// this controller will fail to build at runtime when calling
// `Complete`. The reconcile loop doesn't strictly need to watch a
// particular resource as it's performing polling against a channel
// source. We use the DaemonSet here, as it's the closest thing to what
// we're reconciling (i.e. the collector DaemonSet).
For(
&appsv1.DaemonSet{},
// For the (rare) cases where the collector DaemonSet is deleted and
// re-created we don't want this event to reconcile into the
// polling-based control loop.
builder.WithPredicates(predicate.NewPredicateFuncs(func(_ client.Object) bool {
return false
})),
).
WatchesRawSource(
source.Channel(ch, &handler.EnqueueRequestForObject{}),
).
Complete(reconciler)
if err != nil {
return fmt.Errorf("create target status controller: %w", err)
}
// Start the controller only once.
if err := op.manager.Add(manager.RunnableFunc(func(context.Context) error {
reconciler.ch <- event.GenericEvent{
Object: &appsv1.DaemonSet{},
}
return nil
})); err != nil {
return fmt.Errorf("unable to start target status controller: %w", err)
}
return nil
}
// fetchAllPodMonitorings fetches all ClusterPodMonitoring and PodMonitoring CRs deployed in the cluster. This excludes ClusterNodeMonitoring CRs.
func fetchAllPodMonitorings(ctx context.Context, kubeClient client.Client) ([]monitoringv1.PodMonitoringCRD, error) {
var combinedList []monitoringv1.PodMonitoringCRD
var podMonitoringList monitoringv1.PodMonitoringList
if err := kubeClient.List(ctx, &podMonitoringList); err != nil {
return nil, err
}
for _, pm := range podMonitoringList.Items {
combinedList = append(combinedList, &pm)
}
var clusterPodMonitoringList monitoringv1.ClusterPodMonitoringList
if err := kubeClient.List(ctx, &clusterPodMonitoringList); err != nil {
return nil, err
}
for _, pm := range clusterPodMonitoringList.Items {
combinedList = append(combinedList, &pm)
}
return combinedList, nil
}
// shouldPoll verifies if polling collectors is configured.
func shouldPoll(ctx context.Context, cfgNamespacedName types.NamespacedName, kubeClient client.Client) (bool, error) {
// Check if target status is enabled.
var config monitoringv1.OperatorConfig
if err := kubeClient.Get(ctx, cfgNamespacedName, &config); err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
if !config.Features.TargetStatus.Enabled {
return false, nil
}
return true, nil
}
// Reconcile polls the collector pods, fetches and aggregates target status and
// upserts into each PodMonitoring's Status field.
func (r *targetStatusReconciler) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
timer := r.clock.NewTimer(minPollDuration)
now := time.Now()
cfgNamespacedName := types.NamespacedName{
Name: NameOperatorConfig,
Namespace: r.opts.PublicNamespace,
}
if should, err := shouldPoll(ctx, cfgNamespacedName, r.kubeClient); err != nil {
r.logger.Error(err, "should poll")
} else if should {
if err := pollAndUpdate(ctx, r.logger, r.opts, r.httpClient, r.getTarget, r.kubeClient); err != nil {
r.logger.Error(err, "poll and update")
} else {
// Only log metrics if target polling was successful.
duration := time.Since(now)
targetStatusDuration.WithLabelValues().Set(float64(duration.Milliseconds()))
}
}
// Check if we beat the timer, otherwise wait.
select {
case <-ctx.Done():
break
case <-timer.C():
r.ch <- event.GenericEvent{
Object: &appsv1.DaemonSet{},
}
}
return reconcile.Result{}, nil
}
// pollAndUpdate fetches and updates the target status in each collector pod.
func pollAndUpdate(ctx context.Context, logger logr.Logger, opts Options, httpClient *http.Client, getTarget getTargetFn, kubeClient client.Client) error {
allPodMonitorings, err := fetchAllPodMonitorings(ctx, kubeClient)
if err != nil {
return err
}
if len(allPodMonitorings) == 0 {
// Nothing to update.
return nil
}
targets, err := fetchTargets(ctx, logger, opts, httpClient, getTarget, kubeClient)
if err != nil {
return err
}
return updateTargetStatus(ctx, logger, kubeClient, targets, allPodMonitorings)
}
// fetchTargets retrieves the Prometheus targets using the given target function
// for each collector pod.
func fetchTargets(ctx context.Context, logger logr.Logger, opts Options, httpClient *http.Client, getTarget getTargetFn, kubeClient client.Client) ([]*prometheusv1.TargetsResult, error) {
namespace := opts.OperatorNamespace
var ds appsv1.DaemonSet
if err := kubeClient.Get(ctx, client.ObjectKey{
Name: NameCollector,
Namespace: namespace,
}, &ds); err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
if err != nil {
return nil, err
}
var port *int32
for _, container := range ds.Spec.Template.Spec.Containers {
if isPrometheusContainer(&container) {
port = getPrometheusPort(&container)
if port != nil {
break
}
}
}
if port == nil {
return nil, errors.New("unable to detect Prometheus port")
}
pods, err := getPrometheusPods(ctx, kubeClient, opts, selector)
if err != nil {
return nil, err
}
// Set up pod job queue and jobs
podDiscoveryCh := make(chan prometheusPod)
wg := sync.WaitGroup{}
wg.Add(int(opts.TargetPollConcurrency))
// Must be unbounded or else we deadlock.
targetCh := make(chan *prometheusv1.TargetsResult)
for range opts.TargetPollConcurrency {
// Wrapper function so we can defer in this scope.
go func() {
defer wg.Done()
for prometheusPod := range podDiscoveryCh {
// Fetch operation is blocking.
target, err := getTarget(ctx, logger, httpClient, prometheusPod.port, prometheusPod.pod)
if err != nil {
logger.Error(err, "failed to fetch target", "pod", prometheusPod.pod.GetName())
}
// nil represents being unable to reach a target.
targetCh <- target
}
}()
}
// Unbuffered channels are blocking so make sure we end the goroutine processing them.
go func() {
for _, pod := range pods {
podDiscoveryCh <- prometheusPod{
port: *port,
pod: pod,
}
}
// Must close so jobs aren't waiting on the channel indefinitely.
close(podDiscoveryCh)
// Close target after we're sure all targets are queued.
wg.Wait()
close(targetCh)
}()
results := make([]*prometheusv1.TargetsResult, 0)
for target := range targetCh {
results = append(results, target)
}
return results, nil
}
func patchPodMonitoringStatus(ctx context.Context, kubeClient client.Client, object client.Object, status *monitoringv1.PodMonitoringStatus) error {
patchStatus := map[string]interface{}{
"endpointStatuses": status.EndpointStatuses,
}
patchObject := map[string]interface{}{"status": patchStatus}
patchBytes, err := json.Marshal(patchObject)
if err != nil {
return fmt.Errorf("unable to marshall status: %w", err)
}
patch := client.RawPatch(types.MergePatchType, patchBytes)
if err := kubeClient.Status().Patch(ctx, object, patch); err != nil {
return fmt.Errorf("unable to patch status: %w", err)
}
return nil
}
// updateTargetStatus populates the status object of each pod using the given
// Prometheus targets.
func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient client.Client, targets []*prometheusv1.TargetsResult, podMonitorings []monitoringv1.PodMonitoringCRD) error {
endpointMap, err := buildEndpointStatuses(targets)
if err != nil {
return err
}
var errs []error
withStatuses := map[string]bool{}
for job, endpointStatuses := range endpointMap {
pm, err := getObjectByScrapeJobKey(job)
if err != nil {
errs = append(errs, fmt.Errorf("building target: %s: %w", job, err))
continue
}
if pm == nil {
// Skip hard-coded jobs which we do not patch.
continue
}
withStatuses[pm.GetName()] = true
pm.GetPodMonitoringStatus().EndpointStatuses = endpointStatuses
if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
// Save and log any error encountered while patching the status.
// We don't want to prematurely return if the error was transient
// as we should continue patching all statuses before exiting.
errs = append(errs, err)
logger.Error(err, "patching status", "job", job, "gvk", pm.GetObjectKind().GroupVersionKind())
}
}
// Any pod monitorings that exist but don't have endpoints should also be updated.
for _, pm := range podMonitorings {
if _, exists := withStatuses[pm.GetName()]; !exists {
pm.GetPodMonitoringStatus().EndpointStatuses = []monitoringv1.ScrapeEndpointStatus{}
if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
// Same reasoning as above for error handling.
errs = append(errs, err)
logger.Error(err, "patching empty status", "pm", pm.GetName(), "gvk", pm.GetObjectKind().GroupVersionKind())
}
}
}
return errors.Join(errs...)
}
func getPrometheusPods(ctx context.Context, kubeClient client.Client, opts Options, selector labels.Selector) ([]*corev1.Pod, error) {
var podList corev1.PodList
if err := kubeClient.List(ctx, &podList, client.InNamespace(opts.OperatorNamespace), client.MatchingLabelsSelector{
Selector: selector,
}); err != nil {
return nil, err
}
pods := podList.Items
podsFiltered := make([]*corev1.Pod, 0)
for _, pod := range pods {
if isPrometheusPod(&pod) {
podsFiltered = append(podsFiltered, pod.DeepCopy())
}
}
return podsFiltered, nil
}
func getTarget(ctx context.Context, _ logr.Logger, httpClient *http.Client, port int32, pod *corev1.Pod) (*prometheusv1.TargetsResult, error) {
if pod.Status.PodIP == "" {
return nil, errors.New("pod does not have IP allocated")
}
podURL := fmt.Sprintf("http://%s:%d", pod.Status.PodIP, port)
client, err := api.NewClient(api.Config{
Address: podURL,
Client: httpClient,
})
if err != nil {
return nil, fmt.Errorf("unable to create Prometheus client: %w", err)
}
v1api := prometheusv1.NewAPI(client)
targetsResult, err := v1api.Targets(ctx)
if err != nil {
return nil, fmt.Errorf("unable to fetch targets: %w", err)
}
return &targetsResult, nil
}
type prometheusPod struct {
port int32
pod *corev1.Pod
}
func isPrometheusPod(pod *corev1.Pod) bool {
for _, container := range pod.Spec.Containers {
if isPrometheusContainer(&container) {
return true
}
}
return false
}
func isPrometheusContainer(container *corev1.Container) bool {
return container.Name == CollectorPrometheusContainerName
}
func getPrometheusPort(container *corev1.Container) *int32 {
for _, containerPort := range container.Ports {
// In the future, we could fall back to reading the command line args.
if containerPort.Name == CollectorPrometheusContainerPortName {
// Make a copy.
return ptr.To(containerPort.ContainerPort)
}
}
return nil
}