deploy/kubernetes/operator/pkg/webhook/syncer/syncer.go (204 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package syncer
import (
"context"
"reflect"
"time"
"golang.org/x/sync/errgroup"
admissionv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/manager"
unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
webhookconstants "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
)
var _ ConfigSyncer = &configSyncer{}
// ConfigSyncer syncs ValidatingWebhookConfigurations and MutatingWebhookConfigurations.
type ConfigSyncer interface {
manager.Runnable
}
// NewConfigSyncer creates a ConfigSyncer.
func NewConfigSyncer(caCert []byte, externalService string,
kubeClient kubernetes.Interface) ConfigSyncer {
return newConfigSyncer(caCert, externalService, kubeClient)
}
// newConfigSyncer creates a configSyncer.
func newConfigSyncer(caCert []byte, externalService string,
kubeClient kubernetes.Interface) *configSyncer {
return &configSyncer{
caCert: caCert,
externalService: externalService,
kubeClient: kubeClient,
}
}
// configSyncer implements the ConfigSyncer interface.
type configSyncer struct {
caCert []byte
externalService string
kubeClient kubernetes.Interface
}
// Start starts the ConfigSyncer.
func (cs *configSyncer) Start(ctx context.Context) error {
klog.V(2).Info("config syncer started")
for {
select {
case <-ctx.Done():
klog.V(3).Info("stop syncing webhook configurations")
return nil
default:
}
if err := cs.syncWebhookCfg(); err != nil {
klog.Errorf("sync webhook configuration failed: %v", err)
}
time.Sleep(time.Minute)
}
}
// syncWebhookCfg synchronizes the validatingWebhookConfiguration and mutatingWebhookConfiguration objects.
func (cs *configSyncer) syncWebhookCfg() error {
currentVWC, currentMWC := cs.generateWebhookCfg()
eg := errgroup.Group{}
eg.Go(func() error {
return cs.syncValidatingWebhookCfg(currentVWC)
})
eg.Go(func() error {
return cs.syncMutatingWebhookCfg(currentMWC)
})
return eg.Wait()
}
// syncValidatingWebhookCfg synchronizes the validatingWebhookConfiguration object.
func (cs *configSyncer) syncValidatingWebhookCfg(
currentVWC *admissionv1.ValidatingWebhookConfiguration) error {
vwc, err := cs.kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
Get(context.Background(), webhookconstants.ComponentName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
_, err = cs.kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
Create(context.Background(), currentVWC, metav1.CreateOptions{})
}
return err
}
if reflect.DeepEqual(vwc.Webhooks, currentVWC.Webhooks) {
return nil
}
vwc.Webhooks = currentVWC.Webhooks
_, err = cs.kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
Update(context.Background(), vwc, metav1.UpdateOptions{})
return err
}
// syncMutatingWebhookCfg synchronizes the mutatingWebhookConfiguration object.
func (cs *configSyncer) syncMutatingWebhookCfg(
currentMWC *admissionv1.MutatingWebhookConfiguration) error {
vwc, err := cs.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
Get(context.Background(), webhookconstants.ComponentName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
_, err = cs.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
Create(context.Background(), currentMWC, metav1.CreateOptions{})
}
return err
}
if reflect.DeepEqual(vwc.Webhooks, currentMWC.Webhooks) {
return nil
}
vwc.Webhooks = currentMWC.Webhooks
_, err = cs.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
Update(context.Background(), vwc, metav1.UpdateOptions{})
return err
}
// generateWebhookCfg generates the validatingWebhookConfiguration and mutatingWebhookConfiguration objects.
func (cs *configSyncer) generateWebhookCfg() (
*admissionv1.ValidatingWebhookConfiguration, *admissionv1.MutatingWebhookConfiguration) {
validatingWebhooks := cs.generateValidatingWebhooks()
mutatingWebhooks := cs.generateMutatingWebhooks()
return &admissionv1.ValidatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: webhookconstants.ComponentName,
},
Webhooks: validatingWebhooks,
}, &admissionv1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: webhookconstants.ComponentName,
},
Webhooks: mutatingWebhooks,
}
}
// generateValidatingWebhooks generates validatingWebhooks of validatingWebhookConfiguration.
func (cs *configSyncer) generateValidatingWebhooks() []admissionv1.ValidatingWebhook {
failurePolicy := admissionv1.Fail
sideEffects := admissionv1.SideEffectClassNone
return []admissionv1.ValidatingWebhook{
{
Name: "webhook.for.shuffle.server",
Rules: []admissionv1.RuleWithOperations{
{
Operations: []admissionv1.OperationType{admissionv1.Delete},
Rule: admissionv1.Rule{
APIGroups: []string{corev1.GroupName},
APIVersions: []string{"v1"},
Resources: []string{"pods"},
},
},
},
FailurePolicy: &failurePolicy,
ClientConfig: admissionv1.WebhookClientConfig{
CABundle: cs.caCert,
Service: &admissionv1.ServiceReference{
Name: cs.externalService,
Namespace: utils.GetCurrentNamespace(),
Path: pointer.StringPtr(webhookconstants.ValidatingPodPath),
},
},
ObjectSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constants.LabelShuffleServer: "true",
},
},
SideEffects: &sideEffects,
AdmissionReviewVersions: []string{"v1", "v1beta1"},
TimeoutSeconds: pointer.Int32(30),
},
{
Name: "webhook.for.rss",
Rules: buildRules(),
FailurePolicy: &failurePolicy,
ClientConfig: admissionv1.WebhookClientConfig{
CABundle: cs.caCert,
Service: &admissionv1.ServiceReference{
Name: cs.externalService,
Namespace: utils.GetCurrentNamespace(),
Path: pointer.StringPtr(webhookconstants.ValidatingRssPath),
},
},
SideEffects: &sideEffects,
AdmissionReviewVersions: []string{"v1", "v1beta1"},
},
}
}
// generateMutatingWebhooks generates mutatingWebhooks of mutatingWebhookConfiguration.
func (cs *configSyncer) generateMutatingWebhooks() []admissionv1.MutatingWebhook {
failurePolicy := admissionv1.Fail
sideEffects := admissionv1.SideEffectClassNone
return []admissionv1.MutatingWebhook{
{
Name: "webhook.for.rss",
Rules: buildRules(),
FailurePolicy: &failurePolicy,
ClientConfig: admissionv1.WebhookClientConfig{
CABundle: cs.caCert,
Service: &admissionv1.ServiceReference{
Name: cs.externalService,
Namespace: utils.GetCurrentNamespace(),
Path: pointer.StringPtr(webhookconstants.MutatingRssPath),
},
},
SideEffects: &sideEffects,
AdmissionReviewVersions: []string{"v1", "v1beta1"},
},
}
}
func buildRules() []admissionv1.RuleWithOperations {
return []admissionv1.RuleWithOperations{
{
Operations: []admissionv1.OperationType{admissionv1.Create, admissionv1.Update},
Rule: admissionv1.Rule{
APIGroups: []string{unifflev1alpha1.SchemeGroupVersion.Group},
APIVersions: []string{unifflev1alpha1.SchemeGroupVersion.Version},
Resources: []string{"remoteshuffleservices"},
},
},
}
}