pkg/platform/operator.go (202 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package platform
import (
"context"
"fmt"
"os"
"strings"
camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/util/defaults"
coordination "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/apache/camel-k/v2/pkg/util/log"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
)
const (
OperatorWatchNamespaceEnvVariable = "WATCH_NAMESPACE"
operatorNamespaceEnvVariable = "NAMESPACE"
operatorPodNameEnvVariable = "POD_NAME"
)
const OperatorLockName = "camel-k-lock"
var OperatorImage string
// IsCurrentOperatorGlobal returns true if the operator is configured to watch all namespaces.
func IsCurrentOperatorGlobal() bool {
if watchNamespace, envSet := os.LookupEnv(OperatorWatchNamespaceEnvVariable); !envSet || strings.TrimSpace(watchNamespace) == "" {
log.Debug("Operator is global to all namespaces")
return true
}
log.Debug("Operator is local to namespace")
return false
}
// GetOperatorPod returns the Pod which is running the operator in a given namespace.
func GetOperatorPod(ctx context.Context, c ctrl.Reader, ns string) *corev1.Pod {
lst := corev1.PodList{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: corev1.SchemeGroupVersion.String(),
},
}
if err := c.List(ctx, &lst,
ctrl.InNamespace(ns),
ctrl.MatchingLabels{
"camel.apache.org/component": "operator",
}); err != nil {
return nil
}
if len(lst.Items) == 0 {
return nil
}
return &lst.Items[0]
}
// GetOperatorWatchNamespace returns the namespace the operator watches.
func GetOperatorWatchNamespace() string {
if namespace, envSet := os.LookupEnv(OperatorWatchNamespaceEnvVariable); envSet {
return namespace
}
return ""
}
// GetOperatorNamespace returns the namespace where the current operator is located (if set).
func GetOperatorNamespace() string {
if podNamespace, envSet := os.LookupEnv(operatorNamespaceEnvVariable); envSet {
return podNamespace
}
return ""
}
// GetOperatorPodName returns the pod that is running the current operator (if any).
func GetOperatorPodName() string {
if podName, envSet := os.LookupEnv(operatorPodNameEnvVariable); envSet {
return podName
}
return ""
}
// GetOperatorLockName returns the name of the lock lease that is electing a leader on the particular namespace.
func GetOperatorLockName(operatorID string) string {
return fmt.Sprintf("%s-lock", operatorID)
}
// IsNamespaceLocked tells if the namespace contains a lock indicating that an operator owns it.
func IsNamespaceLocked(ctx context.Context, c ctrl.Reader, namespace string) (bool, error) {
if namespace == "" {
return false, nil
}
platforms, err := ListPlatforms(ctx, c, namespace)
if err != nil {
return true, err
}
for _, platform := range platforms.Items {
lease := coordination.Lease{}
var operatorLockName string
if platform.Name != "" {
operatorLockName = GetOperatorLockName(platform.Name)
} else {
operatorLockName = OperatorLockName
}
if err := c.Get(ctx, ctrl.ObjectKey{Namespace: namespace, Name: operatorLockName}, &lease); err == nil || !k8serrors.IsNotFound(err) {
return true, err
}
}
return false, nil
}
// IsOperatorAllowedOnNamespace returns true if the current operator is allowed to react on changes in the given namespace.
func IsOperatorAllowedOnNamespace(ctx context.Context, c ctrl.Reader, namespace string) (bool, error) {
// allow all local operators
if !IsCurrentOperatorGlobal() {
return true, nil
}
// allow global operators that use a proper operator id
if defaults.OperatorID() != "" {
log.Debugf("Operator ID: %s", defaults.OperatorID())
return true, nil
}
operatorNamespace := GetOperatorNamespace()
if operatorNamespace == namespace {
// Global operator is allowed on its own namespace
return true, nil
}
alreadyOwned, err := IsNamespaceLocked(ctx, c, namespace)
if err != nil {
log.Debugf("Error occurred while testing whether namespace is locked: %v", err)
return false, err
}
log.Debugf("Lock status of namespace %s: %t", namespace, alreadyOwned)
return !alreadyOwned, nil
}
// IsOperatorHandler checks on resource operator id annotation and this operator instance id.
// Operators matching the annotation operator id are allowed to reconcile.
// For legacy resources that are missing a proper operator id annotation the default global operator or the local
// operator in this namespace are candidates for reconciliation.
func IsOperatorHandler(object ctrl.Object) bool {
if object == nil {
return true
}
resourceID := camelv1.GetOperatorIDAnnotation(object)
operatorID := defaults.OperatorID()
// allow operator with matching id to handle the resource
if resourceID == operatorID {
return true
}
// check if we are dealing with resource that is missing a proper operator id annotation
if resourceID == "" {
// allow default global operator to handle legacy resources (missing proper operator id annotations)
if operatorID == DefaultPlatformName {
return true
}
// allow local operators to handle legacy resources (missing proper operator id annotations)
if !IsCurrentOperatorGlobal() {
return true
}
}
return false
}
// IsOperatorHandlerConsideringLock uses normal IsOperatorHandler checks and adds additional check for legacy resources
// that are missing a proper operator id annotation. In general two kind of operators race for reconcile these legacy resources.
// The local operator for this namespace and the default global operator instance. Based on the existence of a namespace
// lock the current local operator has precedence. When no lock exists the default global operator should reconcile.
func IsOperatorHandlerConsideringLock(ctx context.Context, c ctrl.Reader, namespace string, object ctrl.Object) bool {
isHandler := IsOperatorHandler(object)
if !isHandler {
return false
}
resourceID := camelv1.GetOperatorIDAnnotation(object)
// add additional check on resources missing an operator id
if resourceID == "" {
operatorNamespace := GetOperatorNamespace()
if operatorNamespace == namespace {
// Global operator is allowed on its own namespace
return true
}
if locked, err := IsNamespaceLocked(ctx, c, namespace); err != nil || locked {
// namespace is locked so local operators do have precedence
return !IsCurrentOperatorGlobal()
}
}
return true
}
// FilteringFuncs do preliminary checks to determine if certain events should be handled by the controller
// based on labels on the resources (e.g. camel.apache.org/operator.id) and the operator configuration,
// before handing the computation over to the user code.
type FilteringFuncs[T ctrl.Object] struct {
// Create returns true if the Create event should be processed
CreateFunc func(event.TypedCreateEvent[T]) bool
// Delete returns true if the Delete event should be processed
DeleteFunc func(event.TypedDeleteEvent[T]) bool
// Update returns true if the Update event should be processed
UpdateFunc func(event.TypedUpdateEvent[T]) bool
// Generic returns true if the Generic event should be processed
GenericFunc func(event.TypedGenericEvent[T]) bool
}
func (f FilteringFuncs[T]) Create(e event.TypedCreateEvent[T]) bool {
if !IsOperatorHandler(e.Object) {
return false
}
if f.CreateFunc != nil {
return f.CreateFunc(e)
}
return true
}
func (f FilteringFuncs[T]) Delete(e event.TypedDeleteEvent[T]) bool {
if !IsOperatorHandler(e.Object) {
return false
}
if f.DeleteFunc != nil {
return f.DeleteFunc(e)
}
return true
}
func (f FilteringFuncs[T]) Update(e event.TypedUpdateEvent[T]) bool {
if !IsOperatorHandler(e.ObjectNew) {
return false
}
if camelv1.GetOperatorIDAnnotation(e.ObjectOld) != camelv1.GetOperatorIDAnnotation(e.ObjectNew) {
// Always force reconciliation when the object becomes managed by the current operator
return true
}
if camelv1.GetIntegrationProfileAnnotation(e.ObjectOld) != camelv1.GetIntegrationProfileAnnotation(e.ObjectNew) {
// Always force reconciliation when the object gets attached to a new integration profile
return true
}
if camelv1.GetIntegrationProfileNamespaceAnnotation(e.ObjectOld) != camelv1.GetIntegrationProfileNamespaceAnnotation(e.ObjectNew) {
// Always force reconciliation when the object gets attached to a new integration profile
return true
}
if f.UpdateFunc != nil {
return f.UpdateFunc(e)
}
return true
}
func (f FilteringFuncs[T]) Generic(e event.TypedGenericEvent[T]) bool {
if !IsOperatorHandler(e.Object) {
return false
}
if f.GenericFunc != nil {
return f.GenericFunc(e)
}
return true
}
var _ predicate.Predicate = FilteringFuncs[ctrl.Object]{}