pkg/cmd/bind.go (416 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package cmd import ( "encoding/json" "errors" "fmt" "strings" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" cclient "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/trait" "github.com/apache/camel-k/v2/pkg/util/kubernetes" "github.com/apache/camel-k/v2/pkg/util/reference" "github.com/apache/camel-k/v2/pkg/util/uri" "github.com/spf13/cobra" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/cli-runtime/pkg/printers" "sigs.k8s.io/controller-runtime/pkg/client" ) // newCmdBind --. func newCmdBind(rootCmdOptions *RootCmdOptions) (*cobra.Command, *bindCmdOptions) { options := bindCmdOptions{ RootCmdOptions: rootCmdOptions, } cmd := cobra.Command{ Use: "bind [source] [sink] ...", Short: "Bind Kubernetes resources, such as Kamelets, in an integration flow.", Long: "Bind Kubernetes resources, such as Kamelets, in an integration flow. Endpoints are expected in the format \"[[apigroup/]version:]kind:[namespace/]name\" or plain Camel URIs.", PersistentPreRunE: decode(&options), PreRunE: options.preRunE, RunE: options.runE, Annotations: make(map[string]string), } cmd.Flags().StringArrayP("connect", "c", nil, "A ServiceBinding or Provisioned Service that the integration should bind to, specified as [[apigroup/]version:]kind:[namespace/]name") cmd.Flags().String("error-handler", "", `Add error handler (none|log|sink:<endpoint>). Sink endpoints are expected in the format "[[apigroup/]version:]kind:[namespace/]name", plain Camel URIs or Kamelet name.`) cmd.Flags().String("name", "", "Name for the binding") cmd.Flags().StringP("output", "o", "", "Output format. One of: json|yaml") cmd.Flags().StringArrayP("property", "p", nil, `Add a binding property in the form of "source.<key>=<value>", "sink.<key>=<value>", "error-handler.<key>=<value>" or "step-<n>.<key>=<value> where <n> is the step order starting from 1"`) cmd.Flags().Bool("skip-checks", false, "Do not verify the binding for compliance with Kamelets and other Kubernetes resources") cmd.Flags().StringArray("step", nil, `Add binding steps as Kubernetes resources. Endpoints are expected in the format "[[apigroup/]version:]kind:[namespace/]name", plain Camel URIs or Kamelet name.`) cmd.Flags().StringArrayP("trait", "t", nil, `Add a trait to the corresponding Integration.`) cmd.Flags().StringP("operator-id", "x", "camel-k", "Operator id selected to manage this Pipe.") cmd.Flags().StringArray("annotation", nil, "Add an annotation to the Pipe. E.g. \"--annotation my.company=hello\"") cmd.Flags().Bool("force", false, "Force creation of Pipe regardless of potential misconfiguration.") cmd.Flags().String("service-account", "", "The SA to use to run this binding") return &cmd, &options } const ( sourceKey = "source" sinkKey = "sink" stepKeyPrefix = "step-" errorHandlerKey = "error-handler" ) type bindCmdOptions struct { *RootCmdOptions ErrorHandler string `mapstructure:"error-handler" yaml:",omitempty"` Name string `mapstructure:"name" yaml:",omitempty"` Connects []string `mapstructure:"connects" yaml:",omitempty"` OutputFormat string `mapstructure:"output" yaml:",omitempty"` Properties []string `mapstructure:"properties" yaml:",omitempty"` SkipChecks bool `mapstructure:"skip-checks" yaml:",omitempty"` Steps []string `mapstructure:"steps" yaml:",omitempty"` Traits []string `mapstructure:"traits" yaml:",omitempty"` OperatorID string `mapstructure:"operator-id" yaml:",omitempty"` Annotations []string `mapstructure:"annotations" yaml:",omitempty"` Force bool `mapstructure:"force" yaml:",omitempty"` ServiceAccount string `mapstructure:"service-account" yaml:",omitempty"` } func (o *bindCmdOptions) preRunE(cmd *cobra.Command, args []string) error { if o.OutputFormat != "" { // let the command work in offline mode cmd.Annotations[offlineCommandLabel] = "true" } return o.RootCmdOptions.preRun(cmd, args) } func (o *bindCmdOptions) runE(cmd *cobra.Command, args []string) error { if err := o.validate(cmd, args); err != nil { return err } if err := o.run(cmd, args); err != nil { fmt.Fprintln(cmd.OutOrStdout(), err.Error()) } return nil } func (o *bindCmdOptions) validate(cmd *cobra.Command, args []string) error { if len(args) > 2 { return errors.New("too many arguments: expected source and sink") } else if len(args) < 2 { return errors.New("source or sink arguments are missing") } if o.OperatorID == "" { return fmt.Errorf("cannot use empty operator id") } for _, annotation := range o.Annotations { parts := strings.SplitN(annotation, "=", 2) if len(parts) != 2 { return fmt.Errorf(`invalid annotation specification %s. Expected "<annotationkey>=<annotationvalue>"`, annotation) } } for _, p := range o.Properties { if _, _, _, err := o.parseProperty(p); err != nil { return err } } if !o.SkipChecks { source, err := o.decode(args[0], sourceKey) if err != nil { return err } if err := o.checkCompliance(cmd, source); err != nil { return err } sink, err := o.decode(args[1], sinkKey) if err != nil { return err } if err := o.checkCompliance(cmd, sink); err != nil { return err } for idx, stepDesc := range o.Steps { stepKey := fmt.Sprintf("%s%d", stepKeyPrefix, idx) step, err := o.decode(stepDesc, stepKey) if err != nil { return err } if err := o.checkCompliance(cmd, step); err != nil { return err } } } var client cclient.Client var err error if !isOfflineCommand(cmd) { client, err = o.GetCmdClient() if err != nil { return err } } catalog := trait.NewCatalog(client) return validateTraits(catalog, extractTraitNames(o.Traits)) } func (o *bindCmdOptions) run(cmd *cobra.Command, args []string) error { client, err := o.GetCmdClient() if err != nil { return err } source, err := o.decode(args[0], sourceKey) if err != nil { return err } sink, err := o.decode(args[1], sinkKey) if err != nil { return err } name := o.nameFor(source, sink) binding := v1.Pipe{ ObjectMeta: metav1.ObjectMeta{ Namespace: o.Namespace, Name: name, }, Spec: v1.PipeSpec{ Source: source, Sink: sink, }, } if o.ErrorHandler != "" { if errorHandler, err := o.parseErrorHandler(); err == nil { binding.Spec.ErrorHandler = errorHandler } else { return err } } if len(o.Steps) > 0 { binding.Spec.Steps = make([]v1.Endpoint, 0) for idx, stepDesc := range o.Steps { stepIndex := idx + 1 stepKey := fmt.Sprintf("%s%d", stepKeyPrefix, stepIndex) step, err := o.decode(stepDesc, stepKey) if err != nil { return err } binding.Spec.Steps = append(binding.Spec.Steps, step) } } for _, item := range o.Connects { o.Traits = append(o.Traits, fmt.Sprintf("service-binding.services=%s", item)) } if len(o.Traits) > 0 { if binding.Spec.Integration == nil { binding.Spec.Integration = &v1.IntegrationSpec{} } catalog := trait.NewCatalog(client) if err := configureTraits(o.Traits, &binding.Spec.Integration.Traits, catalog); err != nil { return err } } if binding.Annotations == nil { binding.Annotations = make(map[string]string) } if o.ServiceAccount != "" { binding.Spec.ServiceAccountName = o.ServiceAccount } if !isOfflineCommand(cmd) && o.OperatorID != "" { if err := verifyOperatorID(o.Context, client, o.OperatorID); err != nil { if o.Force { o.PrintfVerboseErrf(cmd, "%s, use --force option or make sure to use a proper operator id", err.Error()) } else { return err } } } // --operator-id={id} is a syntax sugar for '--annotation camel.apache.org/operator.id={id}' binding.SetOperatorID(strings.TrimSpace(o.OperatorID)) for _, annotation := range o.Annotations { parts := strings.SplitN(annotation, "=", 2) if len(parts) == 2 { binding.Annotations[parts[0]] = parts[1] } } if o.OutputFormat != "" { return showPipeOutput(cmd, &binding, o.OutputFormat, client.GetScheme()) } replaced, err := kubernetes.ReplaceResource(o.Context, client, &binding) if err != nil { return err } if !replaced { fmt.Fprintln(cmd.OutOrStdout(), `binding "`+name+`" created`) } else { fmt.Fprintln(cmd.OutOrStdout(), `binding "`+name+`" updated`) } return nil } func showPipeOutput(cmd *cobra.Command, binding *v1.Pipe, outputFormat string, scheme runtime.ObjectTyper) error { printer := printers.NewTypeSetter(scheme) printer.Delegate = &kubernetes.CLIPrinter{ Format: outputFormat, } return printer.PrintObj(binding, cmd.OutOrStdout()) } func (o *bindCmdOptions) parseErrorHandler() (*v1.ErrorHandlerSpec, error) { errHandlMap := make(map[string]interface{}) errHandlType, errHandlValue, err := parseErrorHandlerByType(o.ErrorHandler) if err != nil { return nil, err } switch errHandlType { case "none": errHandlMap["none"] = nil case "log": errHandlMap["log"] = nil case "sink": sinkSpec, err := o.decode(errHandlValue, errorHandlerKey) if err != nil { return nil, err } errHandlMap["sink"] = map[string]interface{}{ "endpoint": sinkSpec, } default: return nil, fmt.Errorf("invalid error handler type %s", o.ErrorHandler) } errHandlMarshalled, err := json.Marshal(&errHandlMap) if err != nil { return nil, err } return &v1.ErrorHandlerSpec{RawMessage: errHandlMarshalled}, nil } func parseErrorHandlerByType(value string) (string, string, error) { errHandlSplit := strings.SplitN(value, ":", 2) if (errHandlSplit[0] == "sink") && len(errHandlSplit) != 2 { return "", "", fmt.Errorf("invalid error handler syntax. Type %s needs a configuration (ie %s:value)", errHandlSplit[0], errHandlSplit[0]) } if len(errHandlSplit) > 1 { return errHandlSplit[0], errHandlSplit[1], nil } return errHandlSplit[0], "", nil } func (o *bindCmdOptions) decode(res string, key string) (v1.Endpoint, error) { refConverter := reference.NewConverter(reference.KameletPrefix) endpoint := v1.Endpoint{} explicitProps := o.getProperties(key) props, err := o.asEndpointProperties(explicitProps) if err != nil { return endpoint, err } endpoint.Properties = props ref, err := refConverter.FromString(res) if err != nil { if uri.HasCamelURIFormat(res) { endpoint.URI = &res return endpoint, nil } return endpoint, err } endpoint.Ref = &ref if endpoint.Ref.Namespace == "" { endpoint.Ref.Namespace = o.Namespace } embeddedProps, err := refConverter.PropertiesFromString(res) if err != nil { return endpoint, err } if len(embeddedProps) > 0 { allProps := make(map[string]string) for k, v := range explicitProps { allProps[k] = v } for k, v := range embeddedProps { allProps[k] = v } props, err := o.asEndpointProperties(allProps) if err != nil { return endpoint, err } endpoint.Properties = props } return endpoint, nil } func (o *bindCmdOptions) nameFor(source, sink v1.Endpoint) string { if o.Name != "" { return o.Name } sourcePart := o.nameForEndpoint(source) sinkPart := o.nameForEndpoint(sink) name := fmt.Sprintf("%s-to-%s", sourcePart, sinkPart) return kubernetes.SanitizeName(name) } func (o *bindCmdOptions) nameForEndpoint(endpoint v1.Endpoint) string { if endpoint.URI != nil { return uri.GetComponent(*endpoint.URI) } if endpoint.Ref != nil { return endpoint.Ref.Name } return "" } func (o *bindCmdOptions) asEndpointProperties(props map[string]string) (*v1.EndpointProperties, error) { if len(props) == 0 { return nil, nil } data, err := json.Marshal(props) if err != nil { return nil, err } return &v1.EndpointProperties{ RawMessage: v1.RawMessage(data), }, nil } func (o *bindCmdOptions) getProperties(refType string) map[string]string { props := make(map[string]string) for _, p := range o.Properties { tp, k, v, err := o.parseProperty(p) if err != nil { continue } if tp == refType { props[k] = v } } return props } func (o *bindCmdOptions) parseProperty(prop string) (string, string, string, error) { parts := strings.SplitN(prop, "=", 2) if len(parts) != 2 { return "", "", "", fmt.Errorf(`property %q does not follow format "[source|sink|error-handler|step-<n>].<key>=<value>"`, prop) } keyParts := strings.SplitN(parts[0], ".", 2) if len(keyParts) != 2 { return "", "", "", fmt.Errorf(`property key %q does not follow format "[source|sink|error-handler|step-<n>].<key>"`, parts[0]) } isSource := keyParts[0] == sourceKey isSink := keyParts[0] == sinkKey isErrorHandler := keyParts[0] == errorHandlerKey isStep := strings.HasPrefix(keyParts[0], stepKeyPrefix) if !isSource && !isSink && !isStep && !isErrorHandler { return "", "", "", fmt.Errorf(`property key %q does not start with "source.", "sink.", "error-handler." or "step-<n>."`, parts[0]) } return keyParts[0], keyParts[1], parts[1], nil } func (o *bindCmdOptions) checkCompliance(cmd *cobra.Command, endpoint v1.Endpoint) error { if endpoint.Ref != nil && endpoint.Ref.Kind == "Kamelet" { c, err := o.GetCmdClient() if err != nil { return err } key := client.ObjectKey{ Namespace: endpoint.Ref.Namespace, Name: endpoint.Ref.Name, } kamelet := v1.Kamelet{} if err := c.Get(o.Context, key, &kamelet); err != nil { if k8serrors.IsNotFound(err) { // Kamelet may be in the operator namespace, but we currently don't have a way to determine it: we just warn fmt.Fprintf(cmd.ErrOrStderr(), "Warning: Kamelet %q not found in namespace %q\n", key.Name, key.Namespace) return nil } return err } if kamelet.Spec.Definition != nil && len(kamelet.Spec.Definition.Required) > 0 { pMap, err := endpoint.Properties.GetPropertyMap() if err != nil { return err } for _, reqProp := range kamelet.Spec.Definition.Required { found := false if endpoint.Properties != nil { if _, contains := pMap[reqProp]; contains { found = true } } if !found && len(o.Connects) == 0 { return fmt.Errorf("binding is missing required property %q for Kamelet %q", reqProp, key.Name) } } } } return nil }