pkg/controller/integration/monitor.go (511 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package integration import ( "context" "errors" "fmt" "reflect" "strconv" "strings" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime/pkg/client" servingv1 "knative.dev/serving/pkg/apis/serving/v1" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/trait" "github.com/apache/camel-k/v2/pkg/util" "github.com/apache/camel-k/v2/pkg/util/digest" "github.com/apache/camel-k/v2/pkg/util/kubernetes" utilResource "github.com/apache/camel-k/v2/pkg/util/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // NewMonitorAction is an action used to monitor manager Integrations. func NewMonitorAction() Action { return &monitorAction{} } type monitorAction struct { baseAction } func (action *monitorAction) Name() string { return "monitor" } func (action *monitorAction) CanHandle(integration *v1.Integration) bool { return integration.Status.Phase == v1.IntegrationPhaseDeploying || integration.Status.Phase == v1.IntegrationPhaseRunning || integration.Status.Phase == v1.IntegrationPhaseError } //nolint:nestif func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { // When in InitializationFailed condition a kit is not available for the integration // so handle it differently from the rest if isInInitializationFailed(integration.Status) { // Only check if the Integration requires a rebuild return action.checkDigestAndRebuild(ctx, integration, nil) } var kit *v1.IntegrationKit var err error if integration.Status.IntegrationKit == nil && integration.Status.Image == "" { return nil, fmt.Errorf("no kit nor container image set on integration %s", integration.Name) } if integration.Status.IntegrationKit != nil { // Managed Integration kit, err = kubernetes.GetIntegrationKit(ctx, action.client, integration.Status.IntegrationKit.Name, integration.Status.IntegrationKit.Namespace) if err != nil { return nil, fmt.Errorf("unable to find integration kit %s/%s: %w", integration.Status.IntegrationKit.Namespace, integration.Status.IntegrationKit.Name, err) } // If integration is in error and its kit is also in error then integration does not change if isInIntegrationKitFailed(integration.Status) && kit.Status.Phase == v1.IntegrationKitPhaseError { return nil, nil } } // Check if the Integration requires a rebuild if changed, err := action.checkDigestAndRebuild(ctx, integration, kit); err != nil { return nil, err } else if changed != nil { return changed, nil } if kit != nil { // Check if an IntegrationKit with higher priority is ready priority, ok := kit.Labels[v1.IntegrationKitPriorityLabel] if !ok { priority = "0" } withHigherPriority, err := labels.NewRequirement(v1.IntegrationKitPriorityLabel, selection.GreaterThan, []string{priority}) if err != nil { return nil, err } kits, err := lookupKitsForIntegration(ctx, action.client, integration, ctrl.MatchingLabelsSelector{ Selector: labels.NewSelector().Add(*withHigherPriority), }) if err != nil { return nil, err } priorityReadyKit, err := findHighestPriorityReadyKit(kits) if err != nil { return nil, err } if priorityReadyKit != nil { integration.SetIntegrationKit(priorityReadyKit) } } // Run traits that are enabled for the phase environment, err := trait.Apply(ctx, action.client, integration, kit) if err != nil { integration.Status.Phase = v1.IntegrationPhaseError integration.SetReadyCondition(corev1.ConditionFalse, v1.IntegrationConditionInitializationFailedReason, err.Error()) return integration, err } // If the platform is not in ready status (it may happen when a new IntegrationPlatform is created), then, we may not be able to // properly apply all the traits. We must set the phase in an unknown status which should be periodically reconciled in order to make sure that // we eventually return in a ready phase (likely once the platform is done) if environment.Platform == nil || environment.Platform.Status.Phase != v1.IntegrationPlatformPhaseReady { integration.Status.Phase = v1.IntegrationPhaseUnknown integration.Status.SetCondition( v1.IntegrationConditionPlatformAvailable, corev1.ConditionFalse, "PlatformMissing", "IntegrationPlatform is missing or not yet ready. If the problem persist, make sure to fix the IntegrationPlatform error or create a new one.", ) return integration, nil } action.checkTraitAnnotationsDeprecatedNotice(integration) return action.monitorPods(ctx, environment, integration) } // Deprecated: to be removed in future versions, when we won't support any longer trait annotations into Integrations. func (action *monitorAction) checkTraitAnnotationsDeprecatedNotice(integration *v1.Integration) { if integration.Annotations != nil { for k := range integration.Annotations { if strings.HasPrefix(k, v1.TraitAnnotationPrefix) { integration.Status.SetCondition( v1.IntegrationConditionType("AnnotationTraitsDeprecated"), corev1.ConditionTrue, "DeprecationNotice", "Annotation traits configuration is deprecated and will be removed soon. Use .spec.traits configuration instead.", ) action.L.Infof( "WARN: annotation traits configuration is deprecated and will be removed soon. Use .spec.traits configuration for %s integration instead.", integration.Name, ) return } } } } func (action *monitorAction) monitorPods(ctx context.Context, environment *trait.Environment, integration *v1.Integration) (*v1.Integration, error) { controller, err := action.newController(environment, integration) if err != nil { return nil, err } // In order to simplify the monitoring and have a minor resource requirement, we will watch only those Pods // which are labeled with `camel.apache.org/integration`. This is a design choice that requires the user to // voluntarily add a label to their Pods (via template, possibly) in order to monitor the non managed Camel applications. if !controller.hasTemplateIntegrationLabel() { // This is happening when the Deployment, CronJob, etc resources // miss the Integration label, required to identify sibling Pods. integration.Status.SetConditions( v1.IntegrationCondition{ Type: v1.IntegrationConditionReady, Status: corev1.ConditionFalse, Reason: v1.IntegrationConditionMonitoringPodsAvailableReason, Message: fmt.Sprintf( "Could not find `camel.apache.org/integration: %s` label in the %s template. Make sure to include this label in the template for Pod monitoring purposes.", integration.GetName(), controller.getControllerName(), ), }, ) return integration, nil } // Enforce the scale sub-resource label selector. // It is used by the HPA that queries the scale sub-resource endpoint, // to list the pods owned by the integration. integration.Status.Selector = v1.IntegrationLabel + "=" + integration.Name // Update the replicas count pendingPods := &corev1.PodList{} err = action.client.List(ctx, pendingPods, ctrl.InNamespace(integration.Namespace), ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name}, ctrl.MatchingFields{"status.phase": string(corev1.PodPending)}) if err != nil { return nil, err } runningPods := &corev1.PodList{} err = action.client.List(ctx, runningPods, ctrl.InNamespace(integration.Namespace), ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name}, ctrl.MatchingFields{"status.phase": string(corev1.PodRunning)}) if err != nil { return nil, err } nonTerminatingPods := 0 for _, pod := range runningPods.Items { if pod.DeletionTimestamp != nil { continue } nonTerminatingPods++ } podCount := len(pendingPods.Items) + nonTerminatingPods replicas, err := util.IToInt32(podCount) if err != nil { return nil, err } integration.Status.Replicas = replicas // Reconcile Integration phase and ready condition if integration.Status.Phase == v1.IntegrationPhaseDeploying { integration.Status.Phase = v1.IntegrationPhaseRunning } if err = action.updateIntegrationPhaseAndReadyCondition( ctx, controller, environment, integration, pendingPods.Items, runningPods.Items, ); err != nil { return nil, err } return integration, nil } func isInInitializationFailed(status v1.IntegrationStatus) bool { if status.Phase != v1.IntegrationPhaseError { return false } if cond := status.GetCondition(v1.IntegrationConditionReady); cond != nil { if cond.Status == corev1.ConditionFalse && cond.Reason == v1.IntegrationConditionInitializationFailedReason { return true } } return false } func isInIntegrationKitFailed(status v1.IntegrationStatus) bool { if cond := status.GetCondition(v1.IntegrationConditionKitAvailable); cond != nil { if cond.Status == corev1.ConditionFalse && status.Phase != v1.IntegrationPhaseError { return true } } return false } func (action *monitorAction) checkDigestAndRebuild(ctx context.Context, integration *v1.Integration, kit *v1.IntegrationKit) (*v1.Integration, error) { secrets, configmaps := getIntegrationSecretAndConfigmapResourceVersions(ctx, action.client, integration) hash, err := digest.ComputeForIntegration(integration, configmaps, secrets) if err != nil { return nil, err } if hash != integration.Status.Digest { action.L.Infof("Integration %s digest has changed: resetting its status. Will check if it needs to be rebuilt and restarted.", integration.Name) if isIntegrationKitResetRequired(integration, kit) { integration.SetIntegrationKit(nil) } integration.Initialize() integration.Status.Digest = hash return integration, nil } return nil, nil } func isIntegrationKitResetRequired(integration *v1.Integration, kit *v1.IntegrationKit) bool { if kit == nil { return false } if v1.GetOperatorIDAnnotation(integration) != "" && v1.GetOperatorIDAnnotation(integration) != v1.GetOperatorIDAnnotation(kit) { // Operator to reconcile the integration has changed. Reset integration kit // so new operator can handle the kit reference return true } if v1.GetIntegrationProfileAnnotation(integration) != "" && v1.GetIntegrationProfileAnnotation(integration) != v1.GetIntegrationProfileAnnotation(kit) { // Integration profile for the integration has changed. Reset integration kit // so new profile can be applied return true } if v1.GetIntegrationProfileNamespaceAnnotation(integration) != "" && v1.GetIntegrationProfileNamespaceAnnotation(integration) != v1.GetIntegrationProfileNamespaceAnnotation(kit) { // Integration profile namespace for the integration has changed. Reset integration kit // so new profile can be applied return true } return false } // getIntegrationSecretAndConfigmapResourceVersions returns the list of resource versions only useful to watch for changes. func getIntegrationSecretAndConfigmapResourceVersions(ctx context.Context, client client.Client, integration *v1.Integration) ([]string, []string) { configmaps := make([]string, 0) secrets := make([]string, 0) if integration.Spec.Traits.Mount != nil && ptr.Deref(integration.Spec.Traits.Mount.HotReload, false) { mergedResources := make([]string, 0) mergedResources = append(mergedResources, integration.Spec.Traits.Mount.Configs...) mergedResources = append(mergedResources, integration.Spec.Traits.Mount.Resources...) for _, c := range mergedResources { if conf, parseErr := utilResource.ParseConfig(c); parseErr == nil { if conf.StorageType() == utilResource.StorageTypeConfigmap { cm := corev1.ConfigMap{ TypeMeta: metav1.TypeMeta{ Kind: "ConfigMap", APIVersion: corev1.SchemeGroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ Namespace: integration.Namespace, Name: conf.Name(), }, } configmaps = append(configmaps, kubernetes.LookupResourceVersion(ctx, client, &cm)) } else if conf.StorageType() == utilResource.StorageTypeSecret { sec := corev1.Secret{ TypeMeta: metav1.TypeMeta{ Kind: "Secret", APIVersion: corev1.SchemeGroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ Namespace: integration.Namespace, Name: conf.Name(), }, } secrets = append(secrets, kubernetes.LookupResourceVersion(ctx, client, &sec)) } } } } return secrets, configmaps } type controller interface { checkReadyCondition(ctx context.Context) (bool, error) updateReadyCondition(readyPods int32) bool hasTemplateIntegrationLabel() bool getControllerName() string } func (action *monitorAction) newController(env *trait.Environment, integration *v1.Integration) (controller, error) { var controller controller var obj ctrl.Object switch { case integration.IsConditionTrue(v1.IntegrationConditionDeploymentAvailable): obj = getUpdatedController(env, &appsv1.Deployment{}) deploy, ok := obj.(*appsv1.Deployment) if !ok { return nil, fmt.Errorf("type assertion failed, not a Deployment: %v", obj) } controller = &deploymentController{ obj: deploy, integration: integration, } case integration.IsConditionTrue(v1.IntegrationConditionKnativeServiceAvailable): obj = getUpdatedController(env, &servingv1.Service{}) svc, ok := obj.(*servingv1.Service) if !ok { return nil, fmt.Errorf("type assertion failed, not a Knative Service: %v", obj) } controller = &knativeServiceController{ obj: svc, integration: integration, } case integration.IsConditionTrue(v1.IntegrationConditionCronJobAvailable): obj = getUpdatedController(env, &batchv1.CronJob{}) cj, ok := obj.(*batchv1.CronJob) if !ok { return nil, fmt.Errorf("type assertion failed, not a CronJob: %v", obj) } controller = &cronJobController{ obj: cj, integration: integration, client: action.client, } default: return nil, fmt.Errorf("unsupported controller for integration %s", integration.Name) } if obj == nil { return nil, fmt.Errorf("unable to retrieve controller for integration %s", integration.Name) } return controller, nil } // getUpdatedController retrieves the controller updated from the deployer trait execution. func getUpdatedController(env *trait.Environment, obj ctrl.Object) ctrl.Object { return env.Resources.GetController(func(object ctrl.Object) bool { return reflect.TypeOf(obj) == reflect.TypeOf(object) }) } func (action *monitorAction) updateIntegrationPhaseAndReadyCondition( ctx context.Context, controller controller, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod, ) error { if done, err := controller.checkReadyCondition(ctx); done || err != nil { // There may be pods that are not ready but still probable for getting error messages. // Ignore returned error from probing as it's expected when the ctrl obj is not ready. _, _, _ = action.probeReadiness(ctx, environment, integration, runningPods) return err } if arePodsFailingStatuses(integration, pendingPods, runningPods) { return nil } readyPods, probeOk, err := action.probeReadiness(ctx, environment, integration, runningPods) if err != nil { return err } if !probeOk { integration.Status.Phase = v1.IntegrationPhaseError return nil } if done := controller.updateReadyCondition(readyPods); done { integration.Status.Phase = v1.IntegrationPhaseRunning return nil } return nil } func arePodsFailingStatuses(integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) bool { // Check Pods statuses for _, pod := range pendingPods { // Check the scheduled condition if scheduled := kubernetes.GetPodCondition(pod, corev1.PodScheduled); scheduled != nil && scheduled.Status == corev1.ConditionFalse && scheduled.Reason == "Unschedulable" { integration.Status.Phase = v1.IntegrationPhaseError integration.SetReadyConditionError(scheduled.Message) return true } } // Check pending container statuses for _, pod := range pendingPods { var containers []corev1.ContainerStatus containers = append(containers, pod.Status.InitContainerStatuses...) containers = append(containers, pod.Status.ContainerStatuses...) for _, container := range containers { // Check the images are pulled if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "ImagePullBackOff" { integration.Status.Phase = v1.IntegrationPhaseError integration.SetReadyConditionError(waiting.Message) return true } } } // Check running container statuses for _, pod := range runningPods { if pod.DeletionTimestamp != nil { continue } var containers []corev1.ContainerStatus containers = append(containers, pod.Status.InitContainerStatuses...) containers = append(containers, pod.Status.ContainerStatuses...) for _, container := range containers { // Check the container state if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "CrashLoopBackOff" { integration.Status.Phase = v1.IntegrationPhaseError integration.SetReadyConditionError(waiting.Message) return true } if terminated := container.State.Terminated; terminated != nil && terminated.Reason == "Error" { integration.Status.Phase = v1.IntegrationPhaseError integration.SetReadyConditionError(terminated.Message) return true } } } return false } // probeReadiness calls the readiness probes of the non-ready Pods directly to retrieve insights from the Camel runtime. // The func return the number of readyPods, the success of the probe and any error may have happened during its execution. func (action *monitorAction) probeReadiness(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pods []corev1.Pod) (int32, bool, error) { // as a default we assume the Integration is Ready readyCondition := v1.IntegrationCondition{ Type: v1.IntegrationConditionReady, Status: corev1.ConditionTrue, Pods: make([]v1.PodCondition, len(pods)), } readyPods := int32(0) unreadyPods := int32(0) runtimeReady := true runtimeFailed := false probeReadinessOk := true for i := range pods { pod := &pods[i] readyCondition.Pods[i].Name = pod.Name for p := range pod.Status.Conditions { if pod.Status.Conditions[p].Type == corev1.PodReady { readyCondition.Pods[i].Condition = pod.Status.Conditions[p] break } } // If it's in ready status, then we don't care to probe. if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady); ready.Status == corev1.ConditionTrue { readyPods++ continue } unreadyPods++ container := getIntegrationContainer(environment, pod) if container == nil { return readyPods, false, fmt.Errorf("integration container not found in Pod %s/%s", pod.Namespace, pod.Name) } // TODO: this code must be moved to a dedicated function // //nolint:nestif if probe := container.ReadinessProbe; probe != nil && probe.HTTPGet != nil { body, err := proxyGetHTTPProbe(ctx, action.client, probe, pod, container) // When invoking the HTTP probe, the kubernetes client exposes a very // specific behavior: // // - if there is no error, that means the pod in not ready just because // the probe has to be called few time as per configuration, so it means // it's not ready, but the probe is OK, and the pod could become ready // at some point // - if the error is Service Unavailable (HTTP 503) then it means the pod // is not ready and the probe is failing, in this case we can use the // response to scrape for camel info // // Here an example of a failed probe (from curl): // // Trying 127.0.0.1:8080... // TCP_NODELAY set // Connected to localhost (127.0.0.1) port 8080 (#0) // GET /q/health/ready HTTP/1.1 // Host: localhost:8080 // User-Agent: curl/7.68.0 // Accept: */* // // Mark bundle as not supporting multiuse // HTTP/1.1 503 Service Unavailable // content-type: application/json; charset=UTF-8 // content-length: 871 // // { // "status": "DOWN", // "checks": [ { // "name": "camel-routes", // "status": "DOWN", // "data": { // "route.id": "route1", // "route.status": "Stopped", // "check.kind": "READINESS" // } // }] // } if err == nil { continue } if errors.Is(err, context.DeadlineExceeded) { readyCondition.Pods[i].Condition.Message = fmt.Sprintf("readiness probe timed out for Pod %s/%s", pod.Namespace, pod.Name) runtimeReady = false continue } if !k8serrors.IsServiceUnavailable(err) { readyCondition.Pods[i].Condition.Message = fmt.Sprintf("readiness probe failed for Pod %s/%s: %s", pod.Namespace, pod.Name, err.Error()) runtimeReady = false continue } health, err := NewHealthCheck(body) if err != nil { return readyPods, false, err } for _, check := range health.Checks { if check.Status == v1.HealthCheckStatusUp { continue } runtimeReady = false runtimeFailed = true readyCondition.Pods[i].Health = append(readyCondition.Pods[i].Health, check) } } } if runtimeFailed { probeReadinessOk = false readyCondition.Reason = v1.IntegrationConditionErrorReason readyCondition.Status = corev1.ConditionFalse readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", unreadyPods, unreadyPods+readyPods) integration.Status.SetConditions(readyCondition) } if !runtimeReady { probeReadinessOk = false readyCondition.Reason = v1.IntegrationConditionRuntimeNotReadyReason readyCondition.Status = corev1.ConditionFalse readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", unreadyPods, unreadyPods+readyPods) integration.Status.SetConditions(readyCondition) } return readyPods, probeReadinessOk, nil } func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit, error) { if len(kits) == 0 { return nil, nil } var kit *v1.IntegrationKit priority := 0 for i, k := range kits { if k.Status.Phase != v1.IntegrationKitPhaseReady { continue } p, err := strconv.Atoi(k.Labels[v1.IntegrationKitPriorityLabel]) if err != nil { return nil, err } if p > priority { kit = &kits[i] priority = p } } return kit, nil } func getIntegrationContainer(environment *trait.Environment, pod *corev1.Pod) *corev1.Container { name := environment.GetIntegrationContainerName() for i, container := range pod.Spec.Containers { if container.Name == name { return &pod.Spec.Containers[i] } } return nil }