pkg/trait/kamelets.go (307 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package trait import ( "errors" "fmt" "path/filepath" "sort" "strconv" "strings" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" "github.com/apache/camel-k/v2/pkg/kamelet/repository" "github.com/apache/camel-k/v2/pkg/metadata" "github.com/apache/camel-k/v2/pkg/platform" "github.com/apache/camel-k/v2/pkg/util" "github.com/apache/camel-k/v2/pkg/util/boolean" "github.com/apache/camel-k/v2/pkg/util/camel" "github.com/apache/camel-k/v2/pkg/util/digest" "github.com/apache/camel-k/v2/pkg/util/dsl" ) const ( kameletsTraitID = "kamelets" kameletsTraitOrder = 450 contentKey = "content" KameletLocationProperty = "camel.component.kamelet.location" KameletErrorHandler = "camel.component.kamelet.no-error-handler" kameletMountPointAnnotation = "camel.apache.org/kamelet.mount-point" ) var kameletVersionProperty = fmt.Sprintf("?%s=", v1.KameletVersionProperty) type kameletsTrait struct { BaseTrait traitv1.KameletsTrait `property:",squash"` } func newKameletsTrait() Trait { return &kameletsTrait{ BaseTrait: NewBaseTrait(kameletsTraitID, kameletsTraitOrder), } } func (t *kameletsTrait) Configure(e *Environment) (bool, *TraitCondition, error) { if e.Integration == nil { return false, nil, nil } if !ptr.Deref(t.Enabled, true) { return false, NewIntegrationConditionUserDisabled("Kamelets"), nil } if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization) && !e.IntegrationInRunningPhases() { return false, nil, nil } if ptr.Deref(t.Auto, true) { var kamelets []string _, err := e.ConsumeMeta(false, func(meta metadata.IntegrationMetadata) bool { util.StringSliceUniqueConcat(&kamelets, meta.Kamelets) return true }) if err != nil { return false, nil, err } if len(kamelets) > 0 { sort.Strings(kamelets) t.List = strings.Join(kamelets, ",") } } return len(t.getKameletKeys(false)) > 0, nil, nil } func (t *kameletsTrait) Apply(e *Environment) error { if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) || e.IntegrationInRunningPhases() { if err := t.addKamelets(e); err != nil { return err } } return nil } // collectKamelets load a Kamelet specification setting the specific version specification. func (t *kameletsTrait) collectKamelets(e *Environment) (map[string]*v1.Kamelet, error) { repo, err := repository.NewForPlatform(e.Ctx, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace()) if err != nil { return nil, err } kamelets := make(map[string]*v1.Kamelet) missingKamelets := make([]string, 0) availableKamelets := make([]string, 0) bundledKamelets := make([]string, 0) for _, kml := range strings.Split(t.List, ",") { name := getKameletKey(kml, false) if !v1.ValidKameletName(name) { // Skip kamelet sink and source id continue } kamelet, err := repo.Get(e.Ctx, name) if err != nil { return nil, err } if kamelet == nil { missingKamelets = append(missingKamelets, name) continue } else { availableKamelets = append(availableKamelets, name) } if kamelet.IsBundled() { bundledKamelets = append(bundledKamelets, name) } // We control which version to use (if any is specified) clonedKamelet, err := kamelet.CloneWithVersion(getKameletVersion(kml)) if err != nil { return nil, err } kamelets[clonedKamelet.Name] = clonedKamelet } sort.Strings(availableKamelets) sort.Strings(missingKamelets) sort.Strings(bundledKamelets) if len(missingKamelets) > 0 { message := fmt.Sprintf("kamelets [%s] not found in %s repositories", strings.Join(missingKamelets, ","), repo.String()) e.Integration.Status.SetCondition( v1.IntegrationConditionKameletsAvailable, corev1.ConditionFalse, v1.IntegrationConditionKameletsAvailableReason, message, ) return nil, errors.New(message) } // TODO: // We list the Kamelets coming from a bundle. We want to warn the user // that in the future we'll use the specification coming from the dependency runtime // instead of using the one installed in the cluster. // It may be a good idea in the future to let the user specify the catalog dependency to use // in order to override the one coming from Apache catalog if len(bundledKamelets) > 0 { message := fmt.Sprintf("using bundled kamelets [%s]: make sure the Kamelet specifications is compatible with this Integration runtime."+ " This feature is deprecated as in the future we will use directly the specification coming from the Kamelet catalog dependency jar.", strings.Join(bundledKamelets, ",")) e.Integration.Status.SetCondition( v1.IntegrationConditionType("KameletsDeprecationNotice"), corev1.ConditionTrue, "KameletsDeprecationNotice", message, ) } e.Integration.Status.SetCondition( v1.IntegrationConditionKameletsAvailable, corev1.ConditionTrue, v1.IntegrationConditionKameletsAvailableReason, fmt.Sprintf("kamelets [%s] found in %s repositories", strings.Join(availableKamelets, ","), repo.String()), ) return kamelets, nil } func (t *kameletsTrait) addKamelets(e *Environment) error { if len(t.getKameletKeys(false)) == 0 { return nil } kamelets, err := t.collectKamelets(e) if err != nil { return err } kb := newKameletBundle() for _, kamelet := range kamelets { if err := t.addKameletAsSource(e, kamelet); err != nil { return err } // Adding dependencies from Kamelets util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, kamelet.Spec.Dependencies) // Add to Kamelet bundle configmap kb.add(kamelet) } bundleConfigmaps, err := kb.toConfigmaps(e.Integration.Name, e.Integration.Namespace) if err != nil { return err } // set kamelets runtime location if e.ApplicationProperties == nil { e.ApplicationProperties = map[string]string{} } for _, cm := range bundleConfigmaps { kameletMountPoint := fmt.Sprintf("%s/%s", t.getMountPoint(), cm.Name) cm.Annotations[kameletMountPointAnnotation] = kameletMountPoint e.Resources.Add(cm) if e.ApplicationProperties[KameletLocationProperty] == "" { e.ApplicationProperties[KameletLocationProperty] = fmt.Sprintf("file:%s", kameletMountPoint) } else { e.ApplicationProperties[KameletLocationProperty] += fmt.Sprintf(",file:%s", kameletMountPoint) } } e.ApplicationProperties[KameletLocationProperty] += ",classpath:/kamelets" // required because of https://issues.apache.org/jira/browse/CAMEL-21599 e.ApplicationProperties[KameletErrorHandler] = "false" // resort dependencies sort.Strings(e.Integration.Status.Dependencies) return nil } // This func will add a Kamelet as a generated Integration source. The source included here is going to be used in order to parse the Kamelet // for any component or capability (ie, rest) which is included in the Kamelet spec itself. However, the generated source is marked as coming `FromKamelet`. // When mounting the sources, these generated sources won't be mounted as sources but as Kamelet instead. func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1.Kamelet) error { sources := make([]v1.SourceSpec, 0) if kamelet.Spec.Template != nil { flowData, err := dsl.TemplateToYamlDSL(*kamelet.Spec.Template, kamelet.Name) if err != nil { return err } flowSource := v1.SourceSpec{ DataSpec: v1.DataSpec{ Name: fmt.Sprintf("%s.yaml", kamelet.Name), Content: string(flowData), }, Language: v1.LanguageYaml, } flowSource, err = integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-template", e.Integration.Name, kamelet.Name)) if err != nil { return err } sources = append(sources, flowSource) } for idx, s := range kamelet.Spec.Sources { intSource, err := integrationSourceFromKameletSource(e, kamelet, s, fmt.Sprintf("%s-kamelet-%s-%03d", e.Integration.Name, kamelet.Name, idx)) if err != nil { return err } sources = append(sources, intSource) } for _, source := range sources { replaced := false for idx, existing := range e.Integration.Status.GeneratedSources { if existing.Name == source.Name { replaced = true e.Integration.Status.GeneratedSources[idx] = source } } if !replaced { e.Integration.Status.GeneratedSources = append(e.Integration.Status.GeneratedSources, source) } } return nil } func (t *kameletsTrait) getKameletKeys(withVersion bool) []string { answer := make([]string, 0) for _, item := range strings.Split(t.List, ",") { i := getKameletKey(item, withVersion) if i != "" && v1.ValidKameletName(i) { util.StringSliceUniqueAdd(&answer, i) } } sort.Strings(answer) return answer } func (t *kameletsTrait) getMountPoint() string { if t.MountPoint == "" { return filepath.Join(camel.BasePath, "kamelets") } return t.MountPoint } func getKameletKey(item string, withVersion bool) string { i := strings.Trim(item, " \t\"") if strings.Contains(i, "/") { i = strings.SplitN(i, "/", 2)[0] } if strings.Contains(i, kameletVersionProperty) { versionedKamelet := strings.SplitN(i, kameletVersionProperty, 2) if withVersion { i = fmt.Sprintf("%s-%s", versionedKamelet[0], versionedKamelet[1]) } else { i = versionedKamelet[0] } } return i } func getKameletVersion(item string) string { if strings.Contains(item, fmt.Sprintf("?%s=", v1.KameletVersionProperty)) { versionedKamelet := strings.SplitN(item, kameletVersionProperty, 2) return versionedKamelet[1] } return "" } func integrationSourceFromKameletSource(e *Environment, kamelet *v1.Kamelet, source v1.SourceSpec, name string) (v1.SourceSpec, error) { if source.Type == v1.SourceTypeTemplate { // Kamelets must be named "<kamelet-name>.extension" language := source.InferLanguage() source.Name = fmt.Sprintf("%s.%s", kamelet.Name, string(language)) } source.FromKamelet = true if source.DataSpec.ContentRef != "" { return source, nil } // Create configmaps to avoid storing kamelet definitions in the integration CR // Compute the input digest and store it along with the configmap hash, err := digest.ComputeForSource(source) if err != nil { return v1.SourceSpec{}, err } cm := initializeConfigmapKameletSource(source, hash, name, e.Integration.Namespace, e.Integration.Name, kamelet.Name) e.Resources.Add(&cm) target := source.DeepCopy() target.Content = "" target.ContentRef = name target.ContentKey = contentKey return *target, nil } func initializeConfigmapKameletSource(source v1.SourceSpec, hash, name, namespace, itName, kamName string) corev1.ConfigMap { return corev1.ConfigMap{ TypeMeta: metav1.TypeMeta{ Kind: "ConfigMap", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, Labels: map[string]string{ "camel.apache.org/integration": itName, "camel.apache.org/kamelet": kamName, }, Annotations: map[string]string{ sourceLanguageAnnotation: string(source.Language), sourceNameAnnotation: name, sourceCompressionAnnotation: strconv.FormatBool(source.Compression), "camel.apache.org/source.generated": boolean.TrueString, "camel.apache.org/source.type": string(source.Type), "camel.apache.org/source.digest": hash, }, }, Data: map[string]string{ contentKey: source.Content, }, } }