deploy/kubernetes/operator/pkg/webhook/manager.go (179 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package webhook
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/config"
webhookconstants "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/inspector"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/syncer"
)
const (
certsSecretName = "rss-webhook-certs"
serverCert = "server.crt"
serverKey = "server.key"
caCert = "ca.crt"
)
// AdmissionManager manages admission webhook server for shuffle servers.
type AdmissionManager interface {
manager.Runnable
}
// NewAdmissionManager creates an AdmissionManager.
func NewAdmissionManager(cfg *config.Config) AdmissionManager {
return newAdmissionManager(cfg)
}
// newAdmissionManager creates an admissionManager.
func newAdmissionManager(cfg *config.Config) *admissionManager {
am := &admissionManager{
externalService: cfg.ExternalService,
kubeClient: cfg.KubeClient,
}
if !cfg.NeedLoadCertsFromSecret() {
am.caCertBody = cfg.GetCaCert()
am.tlsConfig = cfg.TLSConfig()
} else {
am.loadCertsFromSecret()
}
mgr, err := ctrl.NewManager(cfg.RESTConfig, ctrl.Options{
LeaderElection: true,
LeaderElectionID: cfg.LeaderElectionID(),
LeaderElectionNamespace: utils.GetCurrentNamespace(),
})
if err != nil {
klog.Fatalf("build manager for admission manager failed: %v", err)
}
am.mgr = mgr
am.syncer = syncer.NewConfigSyncer(am.caCertBody, cfg.ExternalService, cfg.KubeClient)
am.inspector = inspector.NewInspector(cfg, am.tlsConfig)
return am
}
// admissionManager implements the AdmissionManager interface.
type admissionManager struct {
externalService string
caCertBody []byte
tlsConfig *tls.Config
kubeClient kubernetes.Interface
mgr manager.Manager
syncer syncer.ConfigSyncer
inspector inspector.Inspector
}
// Start starts the AdmissionManager.
func (am *admissionManager) Start(ctx context.Context) error {
stopCh := ctx.Done()
if err := am.mgr.Add(am.syncer); err != nil {
klog.Errorf("add syncer to mgr of admission manager failed: %v", err)
return err
}
go func() {
if err := am.mgr.Start(ctx); err != nil {
klog.Fatalf("mgr of admission manager started failed: %v", err)
}
}()
go func() {
if err := am.inspector.Start(ctx); err != nil {
klog.Fatalf("start webhook server failed: %v", err)
}
}()
klog.V(2).Info("admission manager started")
<-stopCh
return nil
}
// generateCerts generates certificate and privateKey and ca certificate for admission webhook
// server, and they will be saved in a secret.
func (am *admissionManager) generateCerts(create bool) (
serverCertificate, serverPrivateKey, caCertificate []byte,
err error) {
var caPrivateKey []byte
caPrivateKey, err = utils.SetUpCaKey()
if err != nil {
klog.Errorf("set up ca key failed %v", err)
return nil, nil, nil, err
}
caCertificate, err = utils.SetUpCaCert(webhookconstants.ComponentName, caPrivateKey)
if err != nil {
klog.Errorf("set up ca cert failed %v", err)
return nil, nil, nil, err
}
namespace := utils.GetCurrentNamespace()
domains, ips := subjectAltNames(namespace, am.externalService)
serverCertificate, serverPrivateKey, err = utils.SetUpSignedCertAndKey(domains, ips,
webhookconstants.ComponentName,
caPrivateKey, caCertificate, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth})
if err != nil {
klog.Errorf("set up server cert error %v", err)
return nil, nil, nil, err
}
if create {
// try to create a new secret to save certificate and privateKey and ca certificate.
_, err = am.kubeClient.CoreV1().Secrets(namespace).Create(context.Background(),
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: certsSecretName,
Namespace: namespace,
},
Data: map[string][]byte{
serverCert: serverCertificate,
serverKey: serverPrivateKey,
caCert: caCertificate,
},
}, metav1.CreateOptions{})
if err != nil {
klog.Errorf("create new certificate secret error %v", err)
return nil, nil, nil, err
}
} else {
// try to update an old secret to save certificate and privateKey and ca certificate.
if err = utils.UpdateSecret(am.kubeClient, namespace, certsSecretName,
func(secret *corev1.Secret) {
secret.Data = map[string][]byte{
serverCert: serverCertificate,
serverKey: serverPrivateKey,
caCert: caCertificate,
}
}); err != nil {
return nil, nil, nil, err
}
}
return caCertificate, serverCertificate, serverPrivateKey, nil
}
// loadCertsFromSecret loads certificate and privateKey and ca certificate from the secret.
func (am *admissionManager) loadCertsFromSecret() {
namespace := utils.GetCurrentNamespace()
create := false
secret, err := am.kubeClient.CoreV1().Secrets(namespace).Get(context.Background(),
certsSecretName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
klog.Fatalf("get secret of %v/%v failed: %v", namespace, certsSecretName, err)
}
create = true
}
var serverCertBody, serverKeyBody, caCertBody []byte
if secret == nil || secret.Data == nil || len(secret.Data[serverCert]) == 0 || len(secret.Data[serverKey]) == 0 ||
len(secret.Data[caCert]) == 0 {
caCertBody, serverCertBody, serverKeyBody, err = am.generateCerts(create)
if err != nil {
klog.Fatalf("generate certs failed: %v", err)
}
} else {
caCertBody = secret.Data[caCert]
serverCertBody = secret.Data[serverCert]
serverKeyBody = secret.Data[serverKey]
}
am.caCertBody = caCertBody
cert, err := tls.X509KeyPair(serverCertBody, serverKeyBody)
if err != nil {
klog.Fatalf("generate key pair error :%v", err)
}
am.tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
}
}
// subjectAltNames builds subject alt names by namespace and service name.
func subjectAltNames(namespace, svcName string) ([]string, []net.IP) {
return []string{
"localhost",
svcName,
fmt.Sprintf("%v.%v.svc", svcName, namespace),
fmt.Sprintf("%v.%v.svc.cluster.local", svcName, namespace),
}, []net.IP{net.ParseIP("127.0.0.1")}
}