pkg/trait/mount.go (381 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package trait
import (
"fmt"
"path"
"path/filepath"
"strings"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
serving "knative.dev/serving/pkg/apis/serving/v1"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
"github.com/apache/camel-k/v2/pkg/util/boolean"
"github.com/apache/camel-k/v2/pkg/util/camel"
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
"github.com/apache/camel-k/v2/pkg/util/log"
"github.com/apache/camel-k/v2/pkg/util/property"
utilResource "github.com/apache/camel-k/v2/pkg/util/resource"
"k8s.io/apimachinery/pkg/api/resource"
)
const (
mountTraitID = "mount"
mountTraitOrder = 1610
)
type mountTrait struct {
BasePlatformTrait
traitv1.MountTrait `property:",squash"`
}
func newMountTrait() Trait {
return &mountTrait{
// Must follow immediately the container trait
BasePlatformTrait: NewBasePlatformTrait(mountTraitID, mountTraitOrder),
}
}
func (t *mountTrait) Configure(e *Environment) (bool, *TraitCondition, error) {
if e.Integration == nil || !e.IntegrationInRunningPhases() {
return false, nil, nil
}
// Validate resources and pvcs
for _, c := range t.Configs {
if !strings.HasPrefix(c, "configmap:") && !strings.HasPrefix(c, "secret:") {
return false, nil, fmt.Errorf("unsupported config %s, must be a configmap or secret resource", c)
}
}
for _, r := range t.Resources {
if !strings.HasPrefix(r, "configmap:") && !strings.HasPrefix(r, "secret:") {
return false, nil, fmt.Errorf("unsupported resource %s, must be a configmap or secret resource", r)
}
}
return true, nil, nil
}
func (t *mountTrait) Apply(e *Environment) error {
container := e.GetIntegrationContainer()
if container == nil {
return fmt.Errorf("unable to find integration container: %s", e.Integration.Name)
}
var volumes *[]corev1.Volume
visited := false
// Deployment
if err := e.Resources.VisitDeploymentE(func(deployment *appsv1.Deployment) error {
volumes = &deployment.Spec.Template.Spec.Volumes
visited = true
return nil
}); err != nil {
return err
}
// Knative Service
if err := e.Resources.VisitKnativeServiceE(func(service *serving.Service) error {
volumes = &service.Spec.ConfigurationSpec.Template.Spec.Volumes
visited = true
return nil
}); err != nil {
return err
}
// CronJob
if err := e.Resources.VisitCronJobE(func(cron *batchv1.CronJob) error {
volumes = &cron.Spec.JobTemplate.Spec.Template.Spec.Volumes
visited = true
return nil
}); err != nil {
return err
}
if visited {
// Volumes declared in the trait config/resource options
// as this func influences the application.properties
// must be set as the first one to execute
err := t.configureVolumesAndMounts(e, volumes, &container.VolumeMounts)
if err != nil {
return err
}
// Here we configure the application.properties
t.addSourcesProperties(e)
if props, err := t.computeApplicationProperties(e); err != nil {
return err
} else if props != nil {
e.Resources.Add(props)
}
// Volumes declared in the Integration resources (including the application.properties Configmap)
t.configureCamelVolumesAndMounts(e, volumes, &container.VolumeMounts)
}
return nil
}
// configureVolumesAndMounts is in charge to mount volumes and mounts coming from the trait configuration.
func (t *mountTrait) configureVolumesAndMounts(e *Environment, vols *[]corev1.Volume, mnts *[]corev1.VolumeMount) error {
for _, c := range t.Configs {
if conf, parseErr := utilResource.ParseConfig(c); parseErr == nil {
// Let Camel parse these resources as properties
destFilePath := t.mountResource(vols, mnts, conf)
e.appendCloudPropertiesLocation(destFilePath)
} else {
return parseErr
}
}
for _, r := range t.Resources {
if res, parseErr := utilResource.ParseResource(r); parseErr == nil {
t.mountResource(vols, mnts, res)
} else {
return parseErr
}
}
for _, v := range t.Volumes {
volume, volumeMount, parseErr := ParseAndCreateVolume(e, v)
if parseErr != nil {
return parseErr
}
*vols = append(*vols, *volume)
*mnts = append(*mnts, *volumeMount)
}
for _, v := range t.EmptyDirs {
volume, volumeMount, parseErr := ParseEmptyDirVolume(v)
if parseErr != nil {
return parseErr
}
*vols = append(*vols, *volume)
*mnts = append(*mnts, *volumeMount)
}
return nil
}
// configureCamelVolumesAndMounts is in charge to mount volumes and mounts coming from Camel configuration
// (ie, sources, properties, kamelets, etcetera).
func (t *mountTrait) configureCamelVolumesAndMounts(e *Environment, vols *[]corev1.Volume, mnts *[]corev1.VolumeMount) {
// Sources index
idx := 0
// Configmap index (may differ as generated sources can have a different name)
cmx := 0
for _, s := range e.Integration.AllSources() {
// We don't process routes embedded (native) or Kamelets
if e.isEmbedded(s) || s.IsGeneratedFromKamelet() {
continue
}
// Routes are copied under /etc/camel/sources and discovered by the runtime accordingly
cmName := fmt.Sprintf("%s-source-%03d", e.Integration.Name, cmx)
if s.ContentRef != "" {
cmName = s.ContentRef
}
cmKey := "content"
if s.ContentKey != "" {
cmKey = s.ContentKey
}
resName := strings.TrimPrefix(s.Name, "/")
refName := fmt.Sprintf("i-source-%03d", idx)
resPath := filepath.Join(camel.SourcesMountPath, resName)
vol := getVolume(refName, "configmap", cmName, cmKey, resName)
mnt := getMount(refName, resPath, resName, true)
*vols = append(*vols, *vol)
*mnts = append(*mnts, *mnt)
idx++
if s.ContentRef == "" {
cmx++
}
}
// Resources (likely application properties or kamelets)
if e.Resources != nil {
e.Resources.VisitConfigMap(func(configMap *corev1.ConfigMap) {
switch configMap.Labels[kubernetes.ConfigMapTypeLabel] {
case CamelPropertiesType:
// Camel properties
propertiesType := configMap.Labels["camel.apache.org/properties.type"]
resName := propertiesType + ".properties"
var mountPath string
switch propertiesType {
case "application":
mountPath = filepath.Join(camel.BasePath, resName)
case "user":
mountPath = filepath.Join(camel.ConfDPath, resName)
}
if propertiesType != "" {
refName := propertiesType + "-properties"
vol := getVolume(refName, "configmap", configMap.Name, "application.properties", resName)
mnt := getMount(refName, mountPath, resName, true)
*vols = append(*vols, *vol)
*mnts = append(*mnts, *mnt)
} else {
log.WithValues("Function", "trait.configureVolumesAndMounts").Infof("Warning: could not determine camel properties type %s", propertiesType)
}
case KameletBundleType:
// Kamelets bundle configmap
kameletMountPoint := configMap.Annotations[kameletMountPointAnnotation]
refName := KameletBundleType
vol := getVolume(refName, "configmap", configMap.Name, "", "")
mnt := getMount(refName, kameletMountPoint, "", true)
*vols = append(*vols, *vol)
*mnts = append(*mnts, *mnt)
}
})
}
}
// mountResource add the resource to volumes and mounts and return the final path where the resource is mounted.
func (t *mountTrait) mountResource(vols *[]corev1.Volume, mnts *[]corev1.VolumeMount, conf *utilResource.Config) string {
refName := kubernetes.SanitizeLabel(conf.Name())
dstDir := conf.DestinationPath()
dstFile := ""
if conf.DestinationPath() != "" {
if conf.Key() != "" {
dstFile = filepath.Base(conf.DestinationPath())
} else {
dstFile = conf.Key()
}
}
vol := getVolume(refName, string(conf.StorageType()), conf.Name(), conf.Key(), dstFile)
mntPath := getMountPoint(conf.Name(), dstDir, string(conf.StorageType()), string(conf.ContentType()))
readOnly := true
if conf.StorageType() == utilResource.StorageTypePVC {
readOnly = false
}
mnt := getMount(refName, mntPath, dstFile, readOnly)
*vols = append(*vols, *vol)
*mnts = append(*mnts, *mnt)
return mnt.MountPath
}
// ParseEmptyDirVolume will parse and return an empty-dir volume.
func ParseEmptyDirVolume(item string) (*corev1.Volume, *corev1.VolumeMount, error) {
volumeParts := strings.Split(item, ":")
if len(volumeParts) != 2 && len(volumeParts) != 3 {
return nil, nil, fmt.Errorf("could not match emptyDir volume as %s", item)
}
refName := kubernetes.SanitizeLabel(volumeParts[0])
sizeLimit := "500Mi"
if len(volumeParts) == 3 {
sizeLimit = volumeParts[2]
}
parsed, err := resource.ParseQuantity(sizeLimit)
if err != nil {
return nil, nil, fmt.Errorf("could not parse sizeLimit from emptyDir volume: %s", volumeParts[2])
}
volume := &corev1.Volume{
Name: refName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
SizeLimit: &parsed,
},
},
}
volumeMount := getMount(refName, volumeParts[1], "", false)
return volume, volumeMount, nil
}
// ParseAndCreateVolume will parse a volume configuration. If the volume does not exist it tries to create one based on the storage
// class configuration provided or default.
// item is expected to be as: name:path/to/mount<:size:accessMode<:storageClassName>>.
func ParseAndCreateVolume(e *Environment, item string) (*corev1.Volume, *corev1.VolumeMount, error) {
volumeParts := strings.Split(item, ":")
volumeName := volumeParts[0]
pvc, err := kubernetes.LookupPersistentVolumeClaim(e.Ctx, e.Client, e.Integration.Namespace, volumeName)
if err != nil {
return nil, nil, err
}
var volume *corev1.Volume
if pvc == nil {
if len(volumeParts) == 2 {
return nil, nil, fmt.Errorf("volume %s does not exist. "+
"Make sure to provide one or configure a dynamic PVC as trait volume configuration pvcName:path/to/mount:size:accessMode<:storageClassName>",
volumeName,
)
}
if err = createPVC(e, volumeParts); err != nil {
return nil, nil, err
}
}
volume = &corev1.Volume{
Name: kubernetes.SanitizeLabel(volumeName),
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: volumeName,
},
},
}
volumeMount := getMount(volumeName, volumeParts[1], "", false)
return volume, volumeMount, nil
}
// createPVC is in charge to create a PersistentVolumeClaim based on the configuration provided. Or it fail within the intent.
// volumeParts is expected to be as: name, path/to/mount, size, accessMode, <storageClassName>.
func createPVC(e *Environment, volumeParts []string) error {
if len(volumeParts) < 4 || len(volumeParts) > 5 {
return fmt.Errorf(
"volume mount syntax error, must be name:path/to/mount:size:accessMode<:storageClassName> was %s",
strings.Join(volumeParts, ":"),
)
}
volumeName := volumeParts[0]
size := volumeParts[2]
accessMode := volumeParts[3]
sizeQty, err := resource.ParseQuantity(size)
if err != nil {
return fmt.Errorf("could not parse size %s, %s", size, err.Error())
}
var sc *storagev1.StorageClass
//nolint: nestif
if len(volumeParts) == 5 {
scName := volumeParts[4]
sc, err = kubernetes.LookupStorageClass(e.Ctx, e.Client, e.Integration.Namespace, scName)
if err != nil {
return fmt.Errorf("error looking up for StorageClass %s, %w", scName, err)
}
if sc == nil {
return fmt.Errorf("could not find any %s StorageClass", scName)
}
} else {
sc, err = kubernetes.LookupDefaultStorageClass(e.Ctx, e.Client)
if err != nil {
return fmt.Errorf("error looking up for default StorageClass, %w", err)
}
if sc == nil {
return fmt.Errorf("could not find any default StorageClass")
}
}
pvc := kubernetes.NewPersistentVolumeClaim(e.Integration.Namespace, volumeName, sc.Name, sizeQty, corev1.PersistentVolumeAccessMode(accessMode))
if err := e.Client.Create(e.Ctx, pvc); err != nil {
return err
}
return nil
}
// computeApplicationProperties is in charge to configure the configmap containing Camel application.properties.
func (t *mountTrait) computeApplicationProperties(e *Environment) (*corev1.ConfigMap, error) {
// application properties
applicationProperties, err := property.EncodePropertyFile(e.ApplicationProperties)
if err != nil {
return nil, fmt.Errorf("could not compute application properties: %w", err)
}
if applicationProperties != "" {
return &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: e.Integration.Name + "-application-properties",
Namespace: e.Integration.Namespace,
Labels: map[string]string{
v1.IntegrationLabel: e.Integration.Name,
"camel.apache.org/properties.type": "application",
kubernetes.ConfigMapTypeLabel: CamelPropertiesType,
},
},
Data: map[string]string{
"application.properties": applicationProperties,
},
}, nil
}
return nil, nil
}
// addSourcesProperties is in charge to add the sources in the application.properties required by Camel K Runtime.
//
//nolint:nestif
func (t *mountTrait) addSourcesProperties(e *Environment) {
if e.ApplicationProperties == nil {
e.ApplicationProperties = make(map[string]string)
}
if e.CamelCatalog.GetRuntimeProvider() == v1.RuntimeProviderPlainQuarkus {
sourceLocationEnabled := false
for _, s := range e.Integration.AllSources() {
// We don't process routes embedded (native) or Kamelets
if e.isEmbedded(s) || s.IsGeneratedFromKamelet() {
continue
}
sourceLocationEnabled = true
break
}
if sourceLocationEnabled {
e.ApplicationProperties["camel.main.source-location-enabled"] = boolean.TrueString
e.ApplicationProperties["camel.main.routes-include-pattern"] = fmt.Sprintf("file:%s/**", camel.SourcesMountPath)
}
} else {
idx := 0
for _, s := range e.Integration.AllSources() {
// We don't process routes embedded (native) or Kamelets
if e.isEmbedded(s) || s.IsGeneratedFromKamelet() {
continue
}
srcName := strings.TrimPrefix(filepath.ToSlash(s.Name), "/")
src := "file:" + path.Join(filepath.ToSlash(camel.SourcesMountPath), srcName)
e.ApplicationProperties[fmt.Sprintf("camel.k.sources[%d].location", idx)] = src
simpleName := srcName
if strings.Contains(srcName, ".") {
simpleName = srcName[0:strings.Index(srcName, ".")]
}
e.ApplicationProperties[fmt.Sprintf("camel.k.sources[%d].name", idx)] = simpleName
for pid, p := range s.PropertyNames {
e.ApplicationProperties[fmt.Sprintf("camel.k.sources[%d].property-names[%d]", idx, pid)] = p
}
if s.Type != "" {
e.ApplicationProperties[fmt.Sprintf("camel.k.sources[%d].type", idx)] = string(s.Type)
}
if s.InferLanguage() != "" {
e.ApplicationProperties[fmt.Sprintf("camel.k.sources[%d].language", idx)] = string(s.InferLanguage())
}
if s.Loader != "" {
e.ApplicationProperties[fmt.Sprintf("camel.k.sources[%d].loader", idx)] = s.Loader
}
if s.Compression {
e.ApplicationProperties[fmt.Sprintf("camel.k.sources[%d].compressed", idx)] = boolean.TrueString
}
idx++
}
}
}