pkg/install/cluster.go (329 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package install
import (
"context"
"errors"
"fmt"
"strconv"
"time"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/apache/camel-k/v2/pkg/client"
"github.com/apache/camel-k/v2/pkg/resources"
"github.com/apache/camel-k/v2/pkg/util/knative"
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
)
func SetupClusterWideResourcesOrCollect(
ctx context.Context, clientProvider client.Provider,
collection *kubernetes.Collection, clusterType string, force bool,
) error {
// Get a client to install the CRD
c, err := clientProvider.Get()
if err != nil {
return err
}
// Install CRDs
if err := installCRDs(ctx, c, collection, force); err != nil {
return err
}
// Don't wait if we're just collecting resources
if collection == nil {
// Wait for all CRDs to be installed before proceeding
if err := WaitForAllCrdInstallation(ctx, clientProvider, 25*time.Second); err != nil {
return err
}
}
// Install ClusterRoles
return installClusterRoles(ctx, c, collection, clusterType)
}
func installClusterRoles(
ctx context.Context, c client.Client, collection *kubernetes.Collection, clusterType string,
) error {
// ClusterRole: camel-k-edit
ok, err := isClusterRoleInstalled(ctx, c, "camel-k-edit")
if err != nil {
return err
}
if !ok || collection != nil {
err := installResource(ctx, c, collection, "/config/rbac/user-cluster-role.yaml")
if err != nil {
return err
}
}
// ClusterRole: camel-k-operator-custom-resource-definitions
ok, err = isClusterRoleInstalled(ctx, c, "camel-k-operator-custom-resource-definitions")
if err != nil {
return err
}
if !ok {
if err := installResource(ctx, c, collection,
"/config/rbac/operator-cluster-role-custom-resource-definitions.yaml"); err != nil {
return err
}
}
// === For OpenShift ===
// ClusterRole: camel-k-operator-console-openshift
isOpenShift, err := isOpenShift(c, clusterType)
if err != nil {
return err
}
if isOpenShift {
ok, err := isClusterRoleInstalled(ctx, c, "camel-k-operator-console-openshift")
if err != nil {
return err
}
if !ok || collection != nil {
err := installResource(ctx, c, collection, "/config/rbac/openshift/operator-cluster-role-console-openshift.yaml")
if err != nil {
return err
}
}
}
// === For Knative ===
// ClusterRole: camel-k-operator-bind-addressable-resolver
isKnative, err := knative.IsEventingInstalled(c)
if err != nil {
return err
}
if isKnative {
ok, err := isClusterRoleInstalled(ctx, c, "camel-k-operator-bind-addressable-resolver")
if err != nil {
return err
}
if !ok || collection != nil {
err := installResource(ctx, c, collection, "/config/rbac/operator-cluster-role-addressable-resolver.yaml")
if err != nil {
return err
}
}
}
return nil
}
func installCRDs(ctx context.Context, c client.Client, collection *kubernetes.Collection, force bool) error {
isAPIExtensionsV1 := true
if _, err := c.Discovery().ServerResourcesForGroupVersion("apiextensions.k8s.io/v1"); err != nil {
if k8serrors.IsNotFound(err) {
isAPIExtensionsV1 = false
} else {
return err
}
}
// Convert the CRD to apiextensions.k8s.io/v1beta1 in case v1 is not available.
// This is mainly required to support OpenShift 3, and older versions of Kubernetes.
// It can be removed as soon as these versions are not supported anymore.
if err := apiextensionsv1.AddToScheme(c.GetScheme()); err != nil {
return err
}
if !isAPIExtensionsV1 {
if err := apiextensionsv1beta1.AddToScheme(c.GetScheme()); err != nil {
return err
}
}
v1beta1Customizer := downgradeToCRDv1beta1(isAPIExtensionsV1)
// Install CRD for Integration Platform (if needed)
if err := installCRD(ctx, c, "IntegrationPlatform", "v1", "camel.apache.org_integrationplatforms.yaml",
v1beta1Customizer, collection, force); err != nil {
return err
}
// Install CRD for Integration Profile (if needed)
if err := installCRD(ctx, c, "IntegrationProfile", "v1", "camel.apache.org_integrationprofiles.yaml",
v1beta1Customizer, collection, force); err != nil {
return err
}
// Install CRD for Integration Kit (if needed)
if err := installCRD(ctx, c, "IntegrationKit", "v1", "camel.apache.org_integrationkits.yaml",
v1beta1Customizer, collection, force); err != nil {
return err
}
// Install CRD for Integration (if needed)
if err := installCRD(ctx, c, "Integration", "v1", "camel.apache.org_integrations.yaml",
v1beta1Customizer, collection, force); err != nil {
return err
}
// Install CRD for Camel Catalog (if needed)
if err := installCRD(ctx, c, "CamelCatalog", "v1", "camel.apache.org_camelcatalogs.yaml",
v1beta1Customizer, collection, force); err != nil {
return err
}
// Install CRD for Build (if needed)
if err := installCRD(ctx, c, "Build", "v1", "camel.apache.org_builds.yaml",
v1beta1Customizer, collection, force); err != nil {
return err
}
// Install CRD for Pipe (if needed)
if err := installCRD(ctx, c, "Pipe", "v1", "camel.apache.org_pipes.yaml",
v1beta1Customizer, collection, force); err != nil {
return err
}
return nil
}
func downgradeToCRDv1beta1(isAPIExtensionsV1 bool) ResourceCustomizer {
return func(object ctrl.Object) ctrl.Object {
// Remove default values in v1beta1 Integration and Pipe CRDs,
removeDefaultFromCrd := func(crd *apiextensionsv1beta1.CustomResourceDefinition, property string) {
defaultValue := apiextensionsv1beta1.JSONSchemaProps{
Default: nil,
}
if crd.Name == "integrations.camel.apache.org" {
crd.Spec.Validation.OpenAPIV3Schema.
Properties["spec"].Properties["template"].Properties["spec"].Properties[property].Items.Schema.
Properties["ports"].Items.Schema.Properties["protocol"] = defaultValue
}
if crd.Name == "pipes.camel.apache.org" {
crd.Spec.Validation.OpenAPIV3Schema.Properties["spec"].Properties["integration"].Properties["template"].
Properties["spec"].Properties[property].Items.Schema.Properties["ports"].Items.Schema.
Properties["protocol"] = defaultValue
}
}
if !isAPIExtensionsV1 {
v1Crd, ok := object.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
return nil
}
v1beta1Crd := &apiextensionsv1beta1.CustomResourceDefinition{}
crd := &apiextensions.CustomResourceDefinition{}
err := apiextensionsv1.Convert_v1_CustomResourceDefinition_To_apiextensions_CustomResourceDefinition(v1Crd, crd, nil)
if err != nil {
return nil
}
err = apiextensionsv1beta1.Convert_apiextensions_CustomResourceDefinition_To_v1beta1_CustomResourceDefinition(crd, v1beta1Crd, nil)
if err != nil {
return nil
}
removeDefaultFromCrd(v1beta1Crd, "ephemeralContainers")
removeDefaultFromCrd(v1beta1Crd, "containers")
removeDefaultFromCrd(v1beta1Crd, "initContainers")
return v1beta1Crd
}
return object
}
}
func WaitForAllCrdInstallation(ctx context.Context, clientProvider client.Provider, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for {
var c client.Client
var err error
if c, err = clientProvider.Get(); err != nil {
return err
}
var errno int
if errno, err = areAllCrdInstalled(c); err != nil {
return err
}
if errno == 0 {
return nil
}
// Check after 2 seconds if not expired
if time.Now().After(deadline) {
return fmt.Errorf(
"cannot check CRD installation after %s seconds (errno %d)",
strconv.FormatInt(timeout.Nanoseconds()/1000000000, 10),
errno)
}
time.Sleep(2 * time.Second)
}
}
// TODO those error codes are a little bit obscure, what do they mean ? it would be better to return an custom error.
func areAllCrdInstalled(c client.Client) (int, error) {
if ok, err := isCrdInstalled(c, "IntegrationPlatform", "v1"); err != nil {
return 1, fmt.Errorf("error installing IntegrationPlatform CRDs: %w", err)
} else if !ok {
return 1, nil
}
if ok, err := isCrdInstalled(c, "IntegrationProfile", "v1"); err != nil {
return 1, fmt.Errorf("error installing IntegrationProfile CRDs: %w", err)
} else if !ok {
return 1, nil
}
if ok, err := isCrdInstalled(c, "IntegrationKit", "v1"); err != nil {
return 2, fmt.Errorf("error installing IntegrationKit CRDs: %w", err)
} else if !ok {
return 2, nil
}
if ok, err := isCrdInstalled(c, "Integration", "v1"); err != nil {
return 3, fmt.Errorf("error installing Integration CRDs: %w", err)
} else if !ok {
return 3, nil
}
if ok, err := isCrdInstalled(c, "CamelCatalog", "v1"); err != nil {
return 4, fmt.Errorf("error installing CamelCatalog CRDs: %w", err)
} else if !ok {
return 4, nil
}
if ok, err := isCrdInstalled(c, "Build", "v1"); err != nil {
return 5, fmt.Errorf("error installing Build CRDs: %w", err)
} else if !ok {
return 5, nil
}
if ok, err := isCrdInstalled(c, "Kamelet", "v1"); err != nil {
return 6, fmt.Errorf("error installing Kamelet CRDs: %w", err)
} else if !ok {
return 6, nil
}
if ok, err := isCrdInstalled(c, "Pipe", "v1"); err != nil {
return 8, fmt.Errorf("error installing Pipe CRDs: %w", err)
} else if !ok {
return 8, nil
}
return 0, nil
}
func isCrdInstalled(c client.Client, kind string, version string) (bool, error) {
lst, err := c.Discovery().ServerResourcesForGroupVersion(fmt.Sprintf("camel.apache.org/%s", version))
if err != nil && k8serrors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
}
for _, res := range lst.APIResources {
if res.Kind == kind {
return true, nil
}
}
return false, nil
}
//nolint:unparam
func installCRD(
ctx context.Context, c client.Client, kind string, version string, resourceName string,
customizer ResourceCustomizer, collection *kubernetes.Collection, force bool,
) error {
content, err := resources.ResourceAsString("config/crd/bases/" + resourceName)
if err != nil {
return err
}
crd, err := kubernetes.LoadResourceFromYaml(c.GetScheme(), content)
if err != nil {
return err
}
crd = customizer(crd)
if crd == nil {
// The conversion has failed
return errors.New("cannot convert " + resourceName + " CRD to apiextensions.k8s.io/v1beta1")
}
if collection != nil {
collection.Add(crd)
return nil
}
installed, err := isCrdInstalled(c, kind, version)
if err != nil {
return err
}
if installed && !force {
return nil
}
_, err = kubernetes.ReplaceResource(ctx, c, crd)
return err
}
func isClusterRoleInstalled(ctx context.Context, c client.Client, name string) (bool, error) {
clusterRole := rbacv1.ClusterRole{
TypeMeta: metav1.TypeMeta{
Kind: "ClusterRole",
APIVersion: "rbac.authorization.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
return isResourceInstalled(ctx, c, &clusterRole)
}
func isResourceInstalled(ctx context.Context, c client.Client, object ctrl.Object) (bool, error) {
err := c.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
if err != nil && k8serrors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
func installResource(ctx context.Context, c client.Client, collection *kubernetes.Collection, resource string) error {
content, err := resources.ResourceAsString(resource)
if err != nil {
return err
}
obj, err := kubernetes.LoadResourceFromYaml(c.GetScheme(), content)
if err != nil {
return err
}
if collection != nil {
collection.Add(obj)
return nil
}
return c.Create(ctx, obj)
}