pkg/trait/trait_types.go (456 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package trait import ( "context" "fmt" "path/filepath" "regexp" "sort" "strings" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" serving "knative.dev/serving/pkg/apis/serving/v1" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/metadata" "github.com/apache/camel-k/v2/pkg/platform" "github.com/apache/camel-k/v2/pkg/util/camel" "github.com/apache/camel-k/v2/pkg/util/kubernetes" "github.com/apache/camel-k/v2/pkg/util/log" ) const ( sourceLanguageAnnotation = "camel.apache.org/source.language" sourceLoaderAnnotation = "camel.apache.org/source.loader" sourceNameAnnotation = "camel.apache.org/source.name" sourceCompressionAnnotation = "camel.apache.org/source.compression" defaultContainerPortName = "http" // Knative does not want name=http, it only supports http1 (HTTP/1) and h2c (HTTP/2) // https://github.com/knative/specs/blob/main/specs/serving/runtime-contract.md#protocols-and-ports defaultKnativeContainerPortName = "h2c" secretStorageType = "secret" configmapStorageType = "configmap" pvcStorageType = "pvc" emptyDirStorageType = "emptyDir" ) var capabilityDynamicProperty = regexp.MustCompile(`(\$\{([^}]*)\})`) // Identifiable represent an identifiable type. type Identifiable interface { ID() ID } // ID uniquely identifies a trait. type ID string // Trait is the interface of all traits. type Trait interface { Identifiable client.Injectable // Configure the trait Configure(environment *Environment) (bool, *TraitCondition, error) // Apply executes a customization of the Environment Apply(environment *Environment) error // InfluencesKit determines if the trait has any influence on Integration Kits InfluencesKit() bool // IsPlatformTrait marks all fundamental traits that allow the platform to work IsPlatformTrait() bool // RequiresIntegrationPlatform indicates that the trait cannot work without an integration platform set RequiresIntegrationPlatform() bool // IsAllowedInProfile tells if the trait supports the given profile IsAllowedInProfile(traitProfile v1.TraitProfile) bool // Order is the order in which the trait should be executed in the normal flow Order() int } // Comparable is the interface exposing comparable funcs. type Comparable interface { Matches(trait Trait) bool } // ComparableTrait is the interface used to compare two traits between them. type ComparableTrait interface { Trait Comparable } // A list of named orders, useful for correctly binding addons. const ( // TraitOrderBeforeControllerCreation can be used to inject configuration such as properties and environment variables // into the running integration, before the actual controller is created. TraitOrderBeforeControllerCreation = 850 // TraitOrderControllerSelection can be used if you intend to provide an alternative controller for the integration // (e.g. Deployment, CronJob, ...). TraitOrderControllerSelection = 950 // TraitOrderPostProcessResources is executed after all other traits are executed (except for important core traits such as // the "owner" trait), so it can be used to post-process generated resources before their actual creation. TraitOrderPostProcessResources = 2450 ) func NewBaseTrait(id string, order int) BaseTrait { return BaseTrait{ TraitID: ID(id), ExecutionOrder: order, L: log.Log.WithName("traits").WithValues("trait", id), } } func NewBasePlatformTrait(id string, order int) BasePlatformTrait { return BasePlatformTrait{ BaseTrait{ TraitID: ID(id), ExecutionOrder: order, L: log.Log.WithName("traits").WithValues("trait", id), }, } } // BaseTrait is the root trait with noop implementations for hooks. type BaseTrait struct { TraitID ID `json:"-"` Client client.Client `json:"-"` ExecutionOrder int `json:"-"` L log.Logger `json:"-"` } // ID returns the identifier of the trait. func (trait *BaseTrait) ID() ID { return trait.TraitID } // InjectClient implements client.ClientInject and allows to inject a client into the trait. func (trait *BaseTrait) InjectClient(c client.Client) { trait.Client = c } // InfluencesKit determines if the trait has any influence on Integration Kits. func (trait *BaseTrait) InfluencesKit() bool { return false } // IsPlatformTrait marks all fundamental traits that allow the platform to work. func (trait *BaseTrait) IsPlatformTrait() bool { return false } // RequiresIntegrationPlatform indicates that the trait cannot work without an integration platform set. func (trait *BaseTrait) RequiresIntegrationPlatform() bool { // All traits require a platform by default return true } // IsAllowedInProfile returns true for any profile by default. func (trait *BaseTrait) IsAllowedInProfile(v1.TraitProfile) bool { return true } // Order contains the order value provided during initialization. func (trait *BaseTrait) Order() int { return trait.ExecutionOrder } // BasePlatformTrait is the root for platform traits with noop implementations for hooks. type BasePlatformTrait struct { BaseTrait } // IsPlatformTrait marks all fundamental traits that allow the platform to work. func (trait *BasePlatformTrait) IsPlatformTrait() bool { return true } // ControllerStrategySelector is the interface for traits that can determine the kind of controller that will run the integration. type ControllerStrategySelector interface { // SelectControllerStrategy tells if the trait with current configuration can select a specific controller to use SelectControllerStrategy(env *Environment) (*ControllerStrategy, error) // ControllerStrategySelectorOrder returns the order (priority) of the controller strategy selector ControllerStrategySelectorOrder() int } // An Environment provides the context for the execution of the traits. // //nolint:containedctx type Environment struct { CamelCatalog *camel.RuntimeCatalog Catalog *Catalog // The Go standard context for the traits execution Ctx context.Context // The client to the API server Client client.Client // The active Platform Platform *v1.IntegrationPlatform // The active IntegrationProfile IntegrationProfile *v1.IntegrationProfile // The current Integration Integration *v1.Integration // The IntegrationKit associated to the Integration IntegrationKit *v1.IntegrationKit // The IntegrationKits to be created for the Integration IntegrationKits []v1.IntegrationKit // The resources owned by the Integration that are applied to the API server Resources *kubernetes.Collection PostActions []func(*Environment) error PostStepProcessors []func(*Environment) error PostProcessors []func(*Environment) error Pipeline []v1.Task ConfiguredTraits []Trait ExecutedTraits []Trait EnvVars []corev1.EnvVar ApplicationProperties map[string]string } // ControllerStrategy is used to determine the kind of controller that needs to be created for the integration. type ControllerStrategy string // List of controller strategies. const ( ControllerStrategyDeployment ControllerStrategy = "deployment" ControllerStrategyKnativeService ControllerStrategy = "knative-service" ControllerStrategyCronJob ControllerStrategy = "cron-job" DefaultControllerStrategy = ControllerStrategyDeployment ) func (e *Environment) GetTrait(id ID) Trait { for _, t := range e.ExecutedTraits { if t.ID() == id { return t } } return nil } func (e *Environment) IntegrationInPhase(phases ...v1.IntegrationPhase) bool { if e.Integration == nil { return false } for _, phase := range phases { if e.Integration.Status.Phase == phase { return true } } return false } func (e *Environment) IntegrationInRunningPhases() bool { return e.IntegrationInPhase(v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning, v1.IntegrationPhaseError) } func (e *Environment) IntegrationKitInPhase(phases ...v1.IntegrationKitPhase) bool { if e.IntegrationKit == nil { return false } for _, phase := range phases { if e.IntegrationKit.Status.Phase == phase { return true } } return false } func (e *Environment) PlatformInPhase(phases ...v1.IntegrationPlatformPhase) bool { if e.Platform == nil { return false } for _, phase := range phases { if e.Platform.Status.Phase == phase { return true } } return false } func (e *Environment) InPhase(c v1.IntegrationKitPhase, i v1.IntegrationPhase) bool { return e.IntegrationKitInPhase(c) && e.IntegrationInPhase(i) } // DetermineProfile determines the TraitProfile of the environment. // First looking at the Integration.Spec for a Profile, // next looking at the IntegrationKit.Spec // and lastly the Platform Profile. func (e *Environment) DetermineProfile() v1.TraitProfile { if e.Integration != nil { if e.Integration.Status.Profile != "" { return e.Integration.Status.Profile } if e.Integration.Spec.Profile != "" { return e.Integration.Spec.Profile } } if e.IntegrationKit != nil && e.IntegrationKit.Spec.Profile != "" { return e.IntegrationKit.Spec.Profile } if e.Platform != nil { return platform.GetTraitProfile(e.Platform) } return v1.DefaultTraitProfile } // DetermineControllerStrategy determines the type of controller that should be used for the integration. func (e *Environment) DetermineControllerStrategy() (ControllerStrategy, error) { defaultStrategy := DefaultControllerStrategy for _, creator := range e.getControllerStrategyChoosers() { if strategy, err := creator.SelectControllerStrategy(e); err != nil { return defaultStrategy, err } else if strategy != nil { return *strategy, nil } } return defaultStrategy, nil } // determineDefaultContainerPortName determines the default port name, according the controller strategy used. func (e *Environment) determineDefaultContainerPortName() string { controller, err := e.DetermineControllerStrategy() if err != nil { log.WithValues("Function", "trait.determineDefaultContainerPortName").Errorf(err, "could not determine controller strategy, using default deployment container name") return defaultContainerPortName } if controller == ControllerStrategyKnativeService { return defaultKnativeContainerPortName } return defaultContainerPortName } func (e *Environment) getControllerStrategyChoosers() []ControllerStrategySelector { var res []ControllerStrategySelector for _, t := range e.ConfiguredTraits { if cc, ok := t.(ControllerStrategySelector); ok { res = append(res, cc) } } sort.Slice(res, func(i, j int) bool { return res[i].ControllerStrategySelectorOrder() < res[j].ControllerStrategySelectorOrder() }) return res } // GetIntegrationPodSpec return the Integration Template Pod Specification, regardless of the deployment strategy. func (e *Environment) GetIntegrationPodSpec() *corev1.PodSpec { // Deployment deployment := e.Resources.GetDeployment(func(d *appsv1.Deployment) bool { return d.Name == e.Integration.Name }) if deployment != nil { return &deployment.Spec.Template.Spec } // Knative service knativeService := e.Resources.GetKnativeService(func(s *serving.Service) bool { return s.Name == e.Integration.Name }) if knativeService != nil { return &knativeService.Spec.Template.Spec.PodSpec } // Cronjob cronJob := e.Resources.GetCronJob(func(c *batchv1.CronJob) bool { return c.Name == e.Integration.Name }) if cronJob != nil { return &cronJob.Spec.JobTemplate.Spec.Template.Spec } return nil } func (e *Environment) DetermineCatalogNamespace() string { // Catalog is expected to be together with the platform if e.Platform != nil && e.Platform.Namespace != "" { return e.Platform.Namespace } if e.Integration != nil && e.Integration.Status.IntegrationKit != nil && e.Integration.Status.IntegrationKit.Namespace != "" { return e.Integration.Status.IntegrationKit.Namespace } if e.Integration != nil && e.Integration.Spec.IntegrationKit != nil && e.Integration.Spec.IntegrationKit.Namespace != "" { return e.Integration.Spec.IntegrationKit.Namespace } if e.IntegrationKit != nil && e.IntegrationKit.Namespace != "" { return e.IntegrationKit.Namespace } if e.Integration != nil && e.Integration.Namespace != "" { return e.Integration.Namespace } return "" } func getVolume(volName, storageType, storageName, filterKey, filterValue string) *corev1.Volume { items := convertToKeyToPath(filterKey, filterValue) volume := corev1.Volume{ Name: volName, VolumeSource: corev1.VolumeSource{}, } switch storageType { case configmapStorageType: volume.VolumeSource.ConfigMap = &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: storageName, }, Items: items, } case secretStorageType: volume.VolumeSource.Secret = &corev1.SecretVolumeSource{ SecretName: storageName, Items: items, } case pvcStorageType: volume.VolumeSource.PersistentVolumeClaim = &corev1.PersistentVolumeClaimVolumeSource{ ClaimName: storageName, } } return &volume } func getMount(volName, mountPath, subPath string, readOnly bool) *corev1.VolumeMount { mount := corev1.VolumeMount{ Name: volName, MountPath: mountPath, ReadOnly: readOnly, } if subPath != "" { mount.SubPath = subPath } return &mount } func convertToKeyToPath(k, v string) []corev1.KeyToPath { if k == "" { return nil } if v == "" { v = k } kp := []corev1.KeyToPath{ { Key: k, Path: v, }, } return kp } func getMountPoint(resourceName string, mountPoint string, storagetype, resourceType string) string { if mountPoint != "" { return mountPoint } if resourceType == "data" { defaultResourceMountPoint := camel.ResourcesConfigmapsMountPath if storagetype == secretStorageType { defaultResourceMountPoint = camel.ResourcesSecretsMountPath } return filepath.Join(defaultResourceMountPoint, resourceName) } defaultMountPoint := camel.ConfigConfigmapsMountPath if storagetype == secretStorageType { defaultMountPoint = camel.ConfigSecretsMountPath } return filepath.Join(defaultMountPoint, resourceName) } type variable struct { Name, Value string } func (e *Environment) collectConfigurationPairs(configurationType string) []variable { return collectConfigurationPairs(configurationType, e.Platform, e.IntegrationKit, e.Integration) } func (e *Environment) GetIntegrationContainerName() string { containerName := defaultContainerName if dt := e.Catalog.GetTrait(containerTraitID); dt != nil { if ct, ok := dt.(*containerTrait); ok { containerName = ct.getContainerName() } } return containerName } // Indicates whether the given source is embedded in the final binary. func (e *Environment) isEmbedded(source v1.SourceSpec) bool { if dt := e.Catalog.GetTrait(quarkusTraitID); dt != nil { if qt, ok := dt.(*quarkusTrait); ok { return qt.isEmbedded(e, source) } } return false } func (e *Environment) GetIntegrationContainer() *corev1.Container { containerName := e.GetIntegrationContainerName() return e.Resources.GetContainerByName(containerName) } func (e *Environment) getIntegrationContainerPort() *corev1.ContainerPort { container := e.GetIntegrationContainer() if container == nil { return nil } // User specified port name portName := "" if t := e.Catalog.GetTrait(containerTraitID); t != nil { if ct, ok := t.(*containerTrait); ok { portName = ct.PortName } } // default port name (may change according the controller strategy, ie Knative) if portName == "" { portName = e.determineDefaultContainerPortName() } for i, port := range container.Ports { if port.Name == portName { return &container.Ports[i] } } return nil } // createContainerPort creates a new container port with values taken from Container trait or default. func (e *Environment) createContainerPort() *corev1.ContainerPort { var name string var port int32 if t := e.Catalog.GetTrait(containerTraitID); t != nil { if ct, ok := t.(*containerTrait); ok { name = ct.PortName port = ct.getPort() } } if name == "" { name = e.determineDefaultContainerPortName() } return &corev1.ContainerPort{ Name: name, ContainerPort: port, Protocol: corev1.ProtocolTCP, } } // CapabilityPropertyKey returns the key or expand any variable provided in it. vars variable contain the // possible dynamic values to use. func CapabilityPropertyKey(camelPropertyKey string, vars map[string]string) string { if capabilityDynamicProperty.MatchString(camelPropertyKey) && vars != nil { match := capabilityDynamicProperty.FindStringSubmatch(camelPropertyKey) if len(match) < 2 { // Should not happen, but fallback to the key not expanded instead of panic if it comes to happen return camelPropertyKey } return strings.ReplaceAll(camelPropertyKey, match[1], vars[match[2]]) } return camelPropertyKey } // ConsumeMeta is used to consume metadata information coming from Integration sources. If no sources available, // would return false. When consuming from meta you should make sure that the configuration is stored in the // status traits by setting each trait configuration when in "auto" mode. // originalSourcesOnly flag indicates if you want to use only the sources provided originally to the Integration, otherwise // it will consume all sources, also the one autogenerated by the operator. func (e *Environment) ConsumeMeta(originalSourcesOnly bool, consumeMeta func(metadata.IntegrationMetadata) bool) (bool, error) { return e.consumeSourcesMeta(originalSourcesOnly, nil, consumeMeta) } // consumeSourcesMeta is used to consume both sources and metadata information coming from Integration sources. // If no sources available would return false. func (e *Environment) consumeSourcesMeta( originalSourcesOnly bool, consumeSources func(sources []v1.SourceSpec) bool, consumeMeta func(metadata.IntegrationMetadata) bool) (bool, error) { var sources []v1.SourceSpec var err error if sources, err = resolveIntegrationSources(e.Ctx, e.Client, e.Integration, originalSourcesOnly, e.Resources); err != nil { return false, err } if len(sources) < 1 { // No sources available return false, nil } if consumeSources != nil { consumeSources(sources) } if e.CamelCatalog == nil { return false, fmt.Errorf("cannot extract metadata from sources. Camel Catalog is null") } meta, err := metadata.ExtractAll(e.CamelCatalog, sources) if err != nil { return false, err } return consumeMeta(meta), nil } func (e *Environment) appendCloudPropertiesLocation(cloudPropertiesLocation string) { if e.ApplicationProperties["camel.main.cloud-properties-location"] == "" { e.ApplicationProperties["camel.main.cloud-properties-location"] = cloudPropertiesLocation } else { e.ApplicationProperties["camel.main.cloud-properties-location"] += "," + cloudPropertiesLocation } }