pkg/cmd/promote.go (484 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package cmd import ( "context" "errors" "fmt" "reflect" "regexp" "strings" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/trait" "github.com/apache/camel-k/v2/pkg/util/camel" "github.com/apache/camel-k/v2/pkg/util/kamelets" "github.com/apache/camel-k/v2/pkg/util/kubernetes" "github.com/apache/camel-k/v2/pkg/util/resource" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) var namedConfRegExp = regexp.MustCompile("([a-z0-9-.]+)/.*") // newCmdPromote --. func newCmdPromote(rootCmdOptions *RootCmdOptions) (*cobra.Command, *promoteCmdOptions) { options := promoteCmdOptions{ RootCmdOptions: rootCmdOptions, } cmd := cobra.Command{ Use: "promote my-it [--to <namespace>] [-x <promoted-operator-id>]", Short: "Promote an Integration/Pipe from an environment to another", Long: "Promote an Integration/Pipe from an environment to another, for example from a Development environment to a Production environment", PreRunE: decode(&options), RunE: options.run, } cmd.Flags().String("to", "", "The namespace where to promote the Integration/Pipe") cmd.Flags().StringP("to-operator", "x", "", "The operator id which will reconcile the promoted Integration/Pipe") cmd.Flags().StringP("output", "o", "", "Output format. One of: json|yaml") cmd.Flags().BoolP("image", "i", false, "Output the container image only") return &cmd, &options } type promoteCmdOptions struct { *RootCmdOptions To string `mapstructure:"to" yaml:",omitempty"` ToOperator string `mapstructure:"to-operator" yaml:",omitempty"` OutputFormat string `mapstructure:"output" yaml:",omitempty"` Image bool `mapstructure:"image" yaml:",omitempty"` } func (o *promoteCmdOptions) validate(_ *cobra.Command, args []string) error { if len(args) != 1 { return errors.New("promote requires an Integration/Pipe name argument") } if o.To == "" { return errors.New("promote requires a destination namespace as --to argument") } if o.To == o.Namespace { return errors.New("source and destination namespaces must be different in order to avoid promoted Integration/Pipe clashes with the source Integration/Pipe") } return nil } func (o *promoteCmdOptions) run(cmd *cobra.Command, args []string) error { if err := o.validate(cmd, args); err != nil { return err } name := args[0] c, err := o.GetCmdClient() if err != nil { return fmt.Errorf("could not retrieve cluster client: %w", err) } if !o.isDryRun() { // Skip these checks if in dry mode opSource, err := operatorInfo(o.Context, c, o.Namespace) if err != nil { return fmt.Errorf("could not retrieve info for Camel K operator source: %w", err) } opDest, err := operatorInfo(o.Context, c, o.To) if err != nil { return fmt.Errorf("could not retrieve info for Camel K operator destination: %w", err) } err = checkOpsCompatibility(cmd, opSource, opDest) if err != nil { return fmt.Errorf("could not verify operators compatibility: %w", err) } } promotePipe := false var sourceIntegration *v1.Integration // We first look if a Pipe with the name exists sourcePipe, err := o.getPipe(c, name) if err != nil && !k8serrors.IsNotFound(err) { return fmt.Errorf("problems looking for Pipe "+name+": %w", err) } if sourcePipe != nil { promotePipe = true } sourceIntegration, err = o.getIntegration(c, name) if err != nil { return fmt.Errorf("could not get Integration "+name+": %w", err) } if sourceIntegration.Status.Phase != v1.IntegrationPhaseRunning { return fmt.Errorf("could not promote an Integration in %s status", sourceIntegration.Status.Phase) } // Image only mode if o.Image { showImageOnly(cmd, sourceIntegration) return nil } if !o.isDryRun() { // Skip these checks if in dry mode err = o.validateDestResources(c, sourceIntegration) if err != nil { return fmt.Errorf("could not validate destination resources: %w", err) } } // Pipe promotion if promotePipe { destPipe := o.editPipe(sourcePipe, sourceIntegration) if o.OutputFormat != "" { return showPipeOutput(cmd, destPipe, o.OutputFormat, c.GetScheme()) } // Ensure the destination namespace has access to the source namespace images err = addSystemPullerRoleBinding(o.Context, c, sourceIntegration.Namespace, destPipe.Namespace) if err != nil { return err } replaced, err := o.replaceResource(destPipe) if !replaced { fmt.Fprintln(cmd.OutOrStdout(), `Promoted Pipe "`+name+`" created`) } else { fmt.Fprintln(cmd.OutOrStdout(), `Promoted Pipe "`+name+`" updated`) } return err } // Plain Integration promotion destIntegration := o.editIntegration(sourceIntegration) if o.OutputFormat != "" { return showIntegrationOutput(cmd, destIntegration, o.OutputFormat) } // Ensure the destination namespace has access to the source namespace images err = addSystemPullerRoleBinding(o.Context, c, sourceIntegration.Namespace, destIntegration.Namespace) if err != nil { return err } replaced, err := o.replaceResource(destIntegration) if !replaced { fmt.Fprintln(cmd.OutOrStdout(), `Promoted Integration "`+name+`" created`) } else { fmt.Fprintln(cmd.OutOrStdout(), `Promoted Integration "`+name+`" updated`) } return err } func checkOpsCompatibility(cmd *cobra.Command, source, dest map[string]string) error { if !compatibleVersions(source["Version"], dest["Version"], cmd) { return fmt.Errorf("source (%s) and destination (%s) Camel K operator versions are not compatible", source["Version"], dest["Version"]) } if !compatibleVersions(source["Runtime Version"], dest["Runtime Version"], cmd) { return fmt.Errorf("source (%s) and destination (%s) Camel K runtime versions are not compatible", source["Runtime Version"], dest["Runtime Version"]) } if source["Registry Address"] != dest["Registry Address"] { return fmt.Errorf("source (%s) and destination (%s) Camel K container images registries are not the same", source["Registry Address"], dest["Registry Address"]) } return nil } func (o *promoteCmdOptions) getPipe(c client.Client, name string) (*v1.Pipe, error) { it := v1.NewPipe(o.Namespace, name) key := k8sclient.ObjectKey{ Name: name, Namespace: o.Namespace, } if err := c.Get(o.Context, key, &it); err != nil { return nil, err } return &it, nil } func (o *promoteCmdOptions) getIntegration(c client.Client, name string) (*v1.Integration, error) { it := v1.NewIntegration(o.Namespace, name) key := k8sclient.ObjectKey{ Name: name, Namespace: o.Namespace, } if err := c.Get(o.Context, key, &it); err != nil { return nil, err } return &it, nil } func (o *promoteCmdOptions) validateDestResources(c client.Client, it *v1.Integration) error { var configmaps []string var secrets []string var pvcs []string var kamelets []string // Mount trait mount, err := toPropertyMap(it.Spec.Traits.Mount) if err != nil { return err } for t, v := range mount { switch t { case "configs": list, ok := v.([]interface{}) if !ok { return fmt.Errorf("invalid %s type: %s, value: %s", t, reflect.TypeOf(v), v) } for _, cn := range list { s, ok := cn.(string) if !ok { return fmt.Errorf("invalid %s type: %s, value: %s", t, reflect.TypeOf(cn), cn) } if conf, parseErr := resource.ParseConfig(s); parseErr == nil { if conf.StorageType() == resource.StorageTypeConfigmap { configmaps = append(configmaps, conf.Name()) } else if conf.StorageType() == resource.StorageTypeSecret { secrets = append(secrets, conf.Name()) } } else { return parseErr } } case "resources": list, ok := v.([]interface{}) if !ok { return fmt.Errorf("invalid %s type: %s, value: %s", t, reflect.TypeOf(v), v) } for _, cn := range list { s, ok := cn.(string) if !ok { return fmt.Errorf("invalid %s type: %s, value: %s", t, reflect.TypeOf(cn), cn) } if conf, parseErr := resource.ParseResource(s); parseErr == nil { if conf.StorageType() == resource.StorageTypeConfigmap { configmaps = append(configmaps, conf.Name()) } else if conf.StorageType() == resource.StorageTypeSecret { secrets = append(secrets, conf.Name()) } } else { return parseErr } } case "volumes": list, ok := v.([]interface{}) if !ok { return fmt.Errorf("invalid %s type: %s, value: %s", t, reflect.TypeOf(v), v) } for _, cn := range list { s, ok := cn.(string) if !ok { return fmt.Errorf("invalid %s type: %s, value: %s", t, reflect.TypeOf(cn), cn) } if conf, parseErr := resource.ParseVolume(s); parseErr == nil { if conf.StorageType() == resource.StorageTypePVC { pvcs = append(pvcs, conf.Name()) } } else { return parseErr } } } } // OpenAPI trait openapi, err := toPropertyMap(it.Spec.Traits.OpenAPI) if err != nil { return err } for k, v := range openapi { if k != "configmaps" { continue } if list, ok := v.([]string); ok { configmaps = append(configmaps, list...) break } } // Kamelets trait kamelet, err := toPropertyMap(it.Spec.Traits.Kamelets) if err != nil { return err } if list, ok := kamelet["list"].(string); ok { kamelets = strings.Split(list, ",") } sourceKamelets, err := o.listKamelets(c, it) if err != nil { return err } kamelets = append(kamelets, sourceKamelets...) anyError := false var errorTrace string for _, name := range configmaps { if !existsCm(o.Context, c, name, o.To) { anyError = true errorTrace += fmt.Sprintf("\n\tConfigmap %s is missing from %s namespace", name, o.To) } } for _, name := range secrets { if !existsSecret(o.Context, c, name, o.To) { anyError = true errorTrace += fmt.Sprintf("\n\tSecret %s is missing from %s namespace", name, o.To) } } for _, name := range pvcs { if !existsPv(o.Context, c, name, o.To) { anyError = true errorTrace += fmt.Sprintf("\n\tPersistentVolume %s is missing from %s namespace", name, o.To) } } for _, name := range kamelets { if !existsKamelet(o.Context, c, name, o.To) { anyError = true errorTrace += fmt.Sprintf("\n\tKamelet %s is missing from %s namespace", name, o.To) } } if anyError { return fmt.Errorf(errorTrace) } return nil } func toPropertyMap(src interface{}) (map[string]interface{}, error) { propMap, err := trait.ToPropertyMap(src) if err != nil { return nil, err } // Migrate legacy configuration properties before promoting if err := trait.MigrateLegacyConfiguration(propMap); err != nil { return nil, err } return propMap, nil } func (o *promoteCmdOptions) listKamelets(c client.Client, it *v1.Integration) ([]string, error) { runtime := v1.RuntimeSpec{ Version: it.Status.RuntimeVersion, Provider: v1.RuntimeProviderQuarkus, } catalog, err := camel.LoadCatalog(o.Context, c, o.Namespace, runtime) if err != nil { return nil, err } kamelets, err := kamelets.ExtractKameletFromSources(o.Context, c, catalog, &kubernetes.Collection{}, it) if err != nil { return nil, err } var filtered []string for _, k := range kamelets { // We must remove any default source/sink if k == "source" || k == "sink" { continue } // We must drop any named configurations match := namedConfRegExp.FindStringSubmatch(k) if len(match) > 0 { filtered = append(filtered, match[1]) } else { filtered = append(filtered, k) } } return filtered, nil } func existsCm(ctx context.Context, c client.Client, name string, namespace string) bool { var obj corev1.ConfigMap key := k8sclient.ObjectKey{ Name: name, Namespace: namespace, } if err := c.Get(ctx, key, &obj); err != nil { return false } return true } func existsSecret(ctx context.Context, c client.Client, name string, namespace string) bool { var obj corev1.Secret key := k8sclient.ObjectKey{ Name: name, Namespace: namespace, } if err := c.Get(ctx, key, &obj); err != nil { return false } return true } func existsPv(ctx context.Context, c client.Client, name string, namespace string) bool { var obj corev1.PersistentVolume key := k8sclient.ObjectKey{ Name: name, Namespace: namespace, } if err := c.Get(ctx, key, &obj); err != nil { return false } return true } func existsKamelet(ctx context.Context, c client.Client, name string, namespace string) bool { var obj v1.Kamelet key := k8sclient.ObjectKey{ Name: name, Namespace: namespace, } if err := c.Get(ctx, key, &obj); err != nil { return false } return true } func (o *promoteCmdOptions) editIntegration(it *v1.Integration) *v1.Integration { dst := v1.NewIntegration(o.To, it.Name) contImage := it.Status.Image dst.Spec = *it.Spec.DeepCopy() dst.Annotations = cloneAnnotations(it.Annotations, o.ToOperator) dst.Labels = cloneLabels(it.Labels) if dst.Spec.Traits.Container == nil { dst.Spec.Traits.Container = &traitv1.ContainerTrait{} } dst.Spec.Traits.Container.Image = contImage return &dst } // Return all annotations overriding the operator Id if provided. func cloneAnnotations(ann map[string]string, operatorID string) map[string]string { operatorIDAnnotationSet := false newMap := make(map[string]string) for k, v := range ann { if k == v1.OperatorIDAnnotation { if operatorID != "" { newMap[v1.OperatorIDAnnotation] = operatorID operatorIDAnnotationSet = true } } else { newMap[k] = v } } if !operatorIDAnnotationSet && operatorID != "" { newMap[v1.OperatorIDAnnotation] = operatorID } return newMap } // Return all labels. The method is a reference if in the future we need to apply any filtering. func cloneLabels(lbs map[string]string) map[string]string { newMap := make(map[string]string) for k, v := range lbs { newMap[k] = v } return newMap } func (o *promoteCmdOptions) editPipe(kb *v1.Pipe, it *v1.Integration) *v1.Pipe { dst := v1.NewPipe(o.To, kb.Name) dst.Spec = *kb.Spec.DeepCopy() dst.Annotations = cloneAnnotations(kb.Annotations, o.ToOperator) dst.Labels = cloneLabels(kb.Labels) contImage := it.Status.Image if dst.Spec.Integration == nil { dst.Spec.Integration = &v1.IntegrationSpec{} } if dst.Spec.Integration.Traits.Container == nil { dst.Spec.Integration.Traits.Container = &traitv1.ContainerTrait{} } dst.Spec.Integration.Traits.Container.Image = contImage if dst.Spec.Source.Ref != nil { dst.Spec.Source.Ref.Namespace = o.To } if dst.Spec.Sink.Ref != nil { dst.Spec.Sink.Ref.Namespace = o.To } if dst.Spec.Steps != nil { for _, step := range dst.Spec.Steps { if step.Ref != nil { step.Ref.Namespace = o.To } } } return &dst } func (o *promoteCmdOptions) replaceResource(res k8sclient.Object) (bool, error) { return kubernetes.ReplaceResource(o.Context, o._client, res) } func (o *promoteCmdOptions) isDryRun() bool { return o.OutputFormat != "" || o.Image } // RoleBinding is required to allow access to images in one namespace // by another namespace. Without this on rbac-enabled clusters, the // image cannot be pulled. func addSystemPullerRoleBinding(ctx context.Context, c client.Client, sourceNS string, destNS string) error { rb := &rbacv1.RoleBinding{ TypeMeta: metav1.TypeMeta{ Kind: "RoleBinding", APIVersion: "rbac.authorization.k8s.io/v1", }, ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-image-puller", destNS), Namespace: sourceNS, }, Subjects: []rbacv1.Subject{ { Kind: "ServiceAccount", Name: "default", Namespace: destNS, }, }, RoleRef: rbacv1.RoleRef{ Kind: "ClusterRole", Name: "system:image-puller", }, } applier := c.ServerOrClientSideApplier() err := applier.Apply(ctx, rb) return err } func showImageOnly(cmd *cobra.Command, integration *v1.Integration) { fmt.Fprintln(cmd.OutOrStdout(), integration.Status.Image) }