reconcilers/reconciler.go (363 lines of code) (raw):
package reconcilers
import (
"bytes"
"context"
"errors"
"reflect"
"strings"
"time"
admissionregistration "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
"github.com/Azure/webhook-tls-manager/config"
"github.com/Azure/webhook-tls-manager/consts"
"github.com/Azure/webhook-tls-manager/goalresolvers"
"github.com/Azure/webhook-tls-manager/metrics"
"github.com/Azure/webhook-tls-manager/toolkit/log"
"github.com/Azure/webhook-tls-manager/utils"
)
const (
retryCount = 10
retryInterval = 5 * time.Second
retryTimeout = 15 * time.Second
)
func currentWebhookConfigAndConfigmapDifferent(ctx context.Context, currentWebhookConfig *admissionregistration.MutatingWebhookConfiguration,
// If the mutating webhook configuration has multiple webhooks, the result of this function is not accurate.
// Because the reflect.DeepEqual function is impacted by the order of array elements.
webhookConfigFromConfig *admissionregistration.MutatingWebhookConfiguration) bool {
logger := log.MustGetLogger(ctx)
if !reflect.DeepEqual(currentWebhookConfig.ObjectMeta.Labels, webhookConfigFromConfig.ObjectMeta.Labels) {
logger.Info(ctx, "currentWebhookConfig.ObjectMeta different from webhookConfigFromConfig.ObjectMeta.Labels")
logger.Debugf(ctx, "currentWebhookConfig.ObjectMeta.Labels: %v", currentWebhookConfig.ObjectMeta.Labels)
logger.Debugf(ctx, "webhookConfigFromConfig.ObjectMeta.Labels: %v", webhookConfigFromConfig.ObjectMeta.Labels)
return true
}
if !reflect.DeepEqual(currentWebhookConfig.Webhooks[0].ClientConfig.Service, webhookConfigFromConfig.Webhooks[0].ClientConfig.Service) ||
!reflect.DeepEqual(currentWebhookConfig.Webhooks[0].Name, webhookConfigFromConfig.Webhooks[0].Name) ||
!reflect.DeepEqual(currentWebhookConfig.Webhooks[0].NamespaceSelector, webhookConfigFromConfig.Webhooks[0].NamespaceSelector) ||
!reflect.DeepEqual(currentWebhookConfig.Webhooks[0].ObjectSelector, webhookConfigFromConfig.Webhooks[0].ObjectSelector) ||
!reflect.DeepEqual(currentWebhookConfig.Webhooks[0].Rules, webhookConfigFromConfig.Webhooks[0].Rules) {
logger.Info(ctx, "currentWebhookConfig.Webhooks[0] different from webhookConfigFromConfig.Webhooks[0]")
logger.Debugf(ctx, "currentWebhookConfig.Webhooks[0]: %v", currentWebhookConfig.Webhooks[0])
logger.Debugf(ctx, "webhookConfigFromConfig.Webhooks[0]: %v", webhookConfigFromConfig.Webhooks[0])
return true
}
return false
}
func shouldUpdateWebhook(ctx context.Context, webhookConfig *admissionregistration.MutatingWebhookConfiguration,
isKubeSystemNamespaceBlocked bool, clientset kubernetes.Interface) (bool, *error) {
logger := log.MustGetLogger(ctx)
admissionEnforcerDisabled, labelExist := webhookConfig.Labels[consts.AdmissionEnforcerDisabledLabel]
//If the value of admissionEnforcerDisabled is false, the kube-system namespace is blocked.
if isKubeSystemNamespaceBlocked {
logger.Info(ctx, "kube-system should be blocked")
if labelExist && admissionEnforcerDisabled == consts.AdmissionEnforcerDisabledValue {
return true, nil
}
} else {
logger.Info(ctx, "kube-system should be unblocked")
if !labelExist || admissionEnforcerDisabled != consts.AdmissionEnforcerDisabledValue {
logger.Info(ctx, "update webhookConfig for label")
return true, nil
}
}
secret, getErr := clientset.CoreV1().Secrets(config.AppConfig.Namespace).Get(ctx, utils.SecretName(), metav1.GetOptions{})
if getErr != nil {
logger.Errorf(ctx, "get secret error: %s", getErr)
return false, &getErr
}
caCert := secret.Data["caCert.pem"]
if len(webhookConfig.Webhooks) == 0 ||
!bytes.Equal(webhookConfig.Webhooks[0].ClientConfig.CABundle, caCert) {
logger.Info(ctx, "update webhookConfig for CABundle")
logger.Debugf(ctx, "webhookConfig.Webhooks[0].ClientConfig.CABundle: %x", webhookConfig.Webhooks[0].ClientConfig.CABundle)
logger.Debugf(ctx, "caCert: %x", caCert)
return true, nil
}
webhookConfigFromConfig, err := getMutatingWebhookConfigFromConfigmap(ctx, clientset, caCert, isKubeSystemNamespaceBlocked)
if err != nil {
logger.Errorf(ctx, "get webhookConfig from configmap error: %s", *err)
return false, err
}
if currentWebhookConfigAndConfigmapDifferent(ctx, webhookConfig, webhookConfigFromConfig) {
logger.Info(ctx, "update webhookConfig for webhookConfigFromConfig")
return true, nil
}
return false, nil
}
func createOrUpdateSecret(ctx context.Context, clientset kubernetes.Interface, data goalresolvers.CertificateData) *error {
logger := log.MustGetLogger(ctx)
secret, getErr := clientset.CoreV1().Secrets(config.AppConfig.Namespace).Get(ctx, utils.SecretName(), metav1.GetOptions{})
if k8serrors.IsNotFound(getErr) {
logger.Infof(ctx, "create secret %s", utils.SecretName())
cerr := createTlsSecret(ctx, clientset, data)
if cerr != nil {
logger.Errorf(ctx, "fail to create secret %s. error: %s", utils.SecretName(), *cerr)
return cerr
}
return nil
}
if getErr != nil {
logger.Errorf(ctx, "get secret %s failed. error: %s", utils.SecretName(), getErr)
return &getErr
}
// Label has been checked in the goal resolver
cerr := updateTlsSecret(ctx, clientset, data, secret)
if cerr != nil {
logger.Errorf(ctx, "fail to update secret %s. error: %s", utils.SecretName(), *cerr)
return cerr
}
return nil
}
func createOrUpdateWebhook(ctx context.Context, clientset kubernetes.Interface, isKubeSystemNamespaceBlocked bool) *error {
logger := log.MustGetLogger(ctx)
secret, err := clientset.CoreV1().Secrets(config.AppConfig.Namespace).Get(ctx, utils.SecretName(), metav1.GetOptions{})
if err != nil {
logger.Infof(ctx, "fail to get secret %s. error: %s", utils.SecretName(), err)
return &err
}
client := clientset.AdmissionregistrationV1().MutatingWebhookConfigurations()
webhook, getErr := client.Get(ctx, utils.WebhookConfigName(), metav1.GetOptions{})
if k8serrors.IsNotFound(getErr) {
logger.Infof(ctx, "mutating webhook configuration %s doesn't exist", utils.WebhookConfigName())
cerr := createMutatingWebhookConfig(ctx, clientset, secret.Data["caCert.pem"], isKubeSystemNamespaceBlocked)
if cerr != nil {
logger.Errorf(ctx, "Create mutating webhook configuration failed. error: %s", *cerr)
return cerr
}
logger.Info(ctx, "Create mutating webhook configuration succeed.")
return nil
}
if getErr != nil {
logger.Errorf(ctx, "get mutating webhook configuration error: %s", getErr)
return &getErr
}
if v, exist := webhook.ObjectMeta.Labels[consts.ManagedLabelKey]; !exist || v != consts.ManagedLabelValue {
logger.Warningf(ctx, "found mutating webhook configuration %s not managed by AKS", utils.WebhookConfigName())
return nil
}
logger.Infof(ctx, "mutating webhook configuration %s is managed by AKS", utils.WebhookConfigName())
shouldUpdate, cerr := shouldUpdateWebhook(ctx, webhook, isKubeSystemNamespaceBlocked, clientset)
if cerr != nil {
return cerr
}
if shouldUpdate {
cerr = updateMutatingWebhookConfig(ctx, clientset, isKubeSystemNamespaceBlocked, secret.Data["caCert.pem"])
if cerr != nil {
logger.Errorf(ctx, "Update mutating webhook configuration failed. error: %s", *cerr)
return cerr
}
logger.Info(ctx, "Update mutating webhook configuration succeed.")
}
return nil
}
func cleanupSecretAndWebhook(ctx context.Context, clientset kubernetes.Interface) *error {
logger := log.MustGetLogger(ctx)
deleteErr := clientset.CoreV1().Secrets(config.AppConfig.Namespace).Delete(ctx, utils.SecretName(), metav1.DeleteOptions{})
if deleteErr != nil {
logger.Errorf(ctx, "failed to cleanup secret %s. error: %s", utils.SecretName(), deleteErr)
return &deleteErr
}
logger.Infof(ctx, "cleanup secret %s succeed.", utils.SecretName())
client := clientset.AdmissionregistrationV1().MutatingWebhookConfigurations()
deleteErr = client.Delete(ctx, utils.WebhookConfigName(), metav1.DeleteOptions{})
if deleteErr != nil {
logger.Errorf(ctx, "failed to cleanup mutating webhook configuration %s. error: %s", utils.WebhookConfigName(), deleteErr)
return &deleteErr
}
logger.Infof(ctx, "cleanup webhook %s succeed.", utils.WebhookConfigName())
return nil
}
func createTlsSecret(ctx context.Context, clientset kubernetes.Interface, data goalresolvers.CertificateData) *error {
logger := log.MustGetLogger(ctx)
secret := &corev1.Secret{
TypeMeta: metav1.TypeMeta{
Kind: "Secret",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: utils.SecretName(),
Namespace: config.AppConfig.Namespace,
Labels: map[string]string{
consts.ManagedLabelKey: consts.ManagedLabelValue,
},
},
Data: map[string][]byte{
"caCert.pem": data.CaCertPem,
"caKey.pem": data.CaKeyPem,
"serverCert.pem": data.ServerCertPem,
"serverKey.pem": data.ServerKeyPem,
},
Type: "Opaque",
}
_, createErr := clientset.CoreV1().Secrets(config.AppConfig.Namespace).Create(ctx, secret, metav1.CreateOptions{})
if createErr != nil {
logger.Errorf(ctx, "create secret %s failed. error: %s", utils.SecretName(), createErr)
return &createErr
}
logger.Infof(ctx, "secret %s created.", utils.SecretName())
return nil
}
func updateTlsSecret(ctx context.Context, clientset kubernetes.Interface, data goalresolvers.CertificateData, secret *corev1.Secret) *error {
logger := log.MustGetLogger(ctx)
secret.Data["caCert.pem"] = data.CaCertPem
secret.Data["caKey.pem"] = data.CaKeyPem
secret.Data["serverCert.pem"] = data.ServerCertPem
secret.Data["serverKey.pem"] = data.ServerKeyPem
_, updateErr := clientset.CoreV1().Secrets(config.AppConfig.Namespace).Update(ctx, secret, metav1.UpdateOptions{})
if updateErr != nil {
logger.Errorf(ctx, "update secret %s failed. error: %s", utils.SecretName(), updateErr)
return &updateErr
}
logger.Infof(ctx, "secret %s updated.", utils.SecretName())
return nil
}
func getMutatingWebhookConfigFromConfigmap(ctx context.Context, clientset kubernetes.Interface, caCert []byte, isKubeSystemNamespaceBlocked bool) (*admissionregistration.MutatingWebhookConfiguration, *error) {
logger := log.MustGetLogger(ctx)
name := config.AppConfig.ObjectName + "-webhook-config"
cm, err := clientset.CoreV1().ConfigMaps(config.AppConfig.Namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
logger.Errorf(ctx, "get webhook-config configmap failed. error: %s", err)
return nil, &err
}
logger.Infof(ctx, "get webhook-config configmap succeed.")
logger.Debugf(ctx, "configmap: %v", cm)
mutatingWebhookConfigJson := cm.Data["mutatingWebhookConfig"]
if mutatingWebhookConfigJson == "" {
logger.Errorf(ctx, "mutatingWebhookConfig is empty")
err = errors.New("mutatingWebhookConfig is empty")
return nil, &err
}
logger.Infof(ctx, "get mutatingWebhookConfig succeed. mutatingWebhookConfig: %s", mutatingWebhookConfigJson)
logger.Debugf(ctx, "mutatingWebhookConfig: %s", mutatingWebhookConfigJson)
var mutatingWebhookConfig admissionregistration.MutatingWebhookConfiguration
err = yaml.NewYAMLOrJSONDecoder(strings.NewReader(mutatingWebhookConfigJson), 1024).Decode(&mutatingWebhookConfig)
if err != nil {
logger.Errorf(ctx, "unmarshal mutatingWebhookConfig failed. error: %s", err)
return nil, &err
}
logger.Infof(ctx, "unmarshal mutatingWebhookConfig succeed.")
logger.Debugf(ctx, "mutatingWebhookConfig: %v", mutatingWebhookConfig)
for i := range mutatingWebhookConfig.Webhooks {
mutatingWebhookConfig.Webhooks[i].ClientConfig.CABundle = caCert
}
var labels map[string]string
if !isKubeSystemNamespaceBlocked {
logger.Info(ctx, "kube-system is unblocked.")
labels = map[string]string{
consts.ManagedLabelKey: consts.ManagedLabelValue,
consts.AdmissionEnforcerDisabledLabel: consts.AdmissionEnforcerDisabledValue,
}
} else {
logger.Info(ctx, "kube-system is blocked.")
labels = map[string]string{
consts.ManagedLabelKey: consts.ManagedLabelValue,
}
}
mutatingWebhookConfig.Labels = labels
logger.Debugf(ctx, "mutatingWebhookConfig from configmap: %v", mutatingWebhookConfig)
return &mutatingWebhookConfig, nil
}
func createMutatingWebhookConfig(ctx context.Context, clientset kubernetes.Interface, caCert []byte, isKubeSystemNamespaceBlocked bool) *error {
logger := log.MustGetLogger(ctx)
mutatingWebhookConfig, err := getMutatingWebhookConfigFromConfigmap(ctx, clientset, caCert, isKubeSystemNamespaceBlocked)
if err != nil {
logger.Errorf(ctx, "get mutating webhook config failed. error: %s", *err)
return err
}
client := clientset.AdmissionregistrationV1().MutatingWebhookConfigurations()
_, createErr := client.Create(ctx, mutatingWebhookConfig, metav1.CreateOptions{})
if createErr != nil {
logger.Errorf(ctx, "create mutating webhook configuration %s failed. error: %s", utils.WebhookConfigName(), createErr)
return &createErr
}
logger.Infof(ctx, "mutating webhook configuration %s created.", utils.WebhookConfigName())
return nil
}
func updateMutatingWebhookConfig(ctx context.Context, clientset kubernetes.Interface, isKubeSystemNamespaceBlocked bool, data []byte) *error {
logger := log.MustGetLogger(ctx)
client := clientset.AdmissionregistrationV1().MutatingWebhookConfigurations()
webhook, getErr := client.Get(ctx, utils.WebhookConfigName(), metav1.GetOptions{})
if getErr != nil {
logger.Infof(ctx, "fail to get mutating webhook config %s. error: %s", utils.WebhookConfigName(), getErr)
return &getErr
}
webhookFromCm, readErr := getMutatingWebhookConfigFromConfigmap(ctx, clientset, data, isKubeSystemNamespaceBlocked)
if readErr != nil {
logger.Infof(ctx, "fail to get mutating webhook config from configmap. error: %s", *readErr)
return readErr
}
webhook.ObjectMeta.Labels = webhookFromCm.ObjectMeta.Labels
webhook.Webhooks = webhookFromCm.Webhooks
logger.Debugf(ctx, "webhook before update: %v", webhook)
_, updateErr := client.Update(ctx, webhook, metav1.UpdateOptions{})
if updateErr != nil {
logger.Infof(ctx, "fail to update mutating webhook config %s. error: %s", utils.WebhookConfigName(), updateErr)
return &updateErr
}
return nil
}
type webhookTlsManagerReconciler struct {
webhookTlsManagerGoalResolver goalresolvers.WebhookTlsManagerGoalResolverInterface
kubeClient kubernetes.Interface
}
func NewWebhookTlsManagerReconciler(webhookTlsManagerGoalResolver goalresolvers.WebhookTlsManagerGoalResolverInterface, kubeClient kubernetes.Interface) Reconciler {
return &webhookTlsManagerReconciler{
webhookTlsManagerGoalResolver: webhookTlsManagerGoalResolver,
kubeClient: kubeClient,
}
}
func (r *webhookTlsManagerReconciler) reconcileOnce(ctx context.Context) *error {
logger := log.MustGetLogger(ctx)
goal, cerr := r.webhookTlsManagerGoalResolver.Resolve(ctx)
if cerr != nil {
logger.Errorf(ctx, "Resolve webhook goal failed. error: %s", *cerr)
return cerr
}
if !goal.IsWebhookTlsManagerEnabled {
cerr = cleanupSecretAndWebhook(ctx, r.kubeClient)
if cerr != nil {
logger.Errorf(ctx, "cleanupSecretAndWebhook error: %s", *cerr)
return cerr
}
logger.Info(ctx, "WebhookTlsManager is disabled. cleanup succeed.")
return nil
}
// Rotate certificates.
if goal.CertData != nil {
metrics.RotateCertificateMetric.Set(1)
cerr = createOrUpdateSecret(ctx, r.kubeClient, *goal.CertData)
if cerr != nil {
logger.Errorf(ctx, "createOrUpdateSecret failed. error: %s", *cerr)
return cerr
}
} else {
metrics.RotateCertificateMetric.Set(0)
}
cerr = createOrUpdateWebhook(ctx, r.kubeClient, goal.IsKubeSystemNamespaceBlocked)
if cerr != nil {
logger.Errorf(ctx, "createOrUpdateWebhook failed. error: %s", *cerr)
return cerr
}
return nil
}
func (r *webhookTlsManagerReconciler) Reconcile(ctx context.Context) *error {
logger := log.MustGetLogger(ctx)
logger.Info(ctx, "Start reconciling webhook.")
currentTime := time.Now()
var cerr *error
for i := 0; i < retryCount; i++ {
if time.Since(currentTime) > retryTimeout {
err := errors.New("reconcileOnce timeout")
logger.Errorf(ctx, "reconcileOnce timeout.")
return &err
}
cerr = r.reconcileOnce(ctx)
if cerr == nil {
logger.Info(ctx, "Reconcile webhook succeed.")
return nil
}
logger.Warningf(ctx, "reconcileOnce failed. error: %s", *cerr)
time.Sleep(retryInterval)
}
logger.Error(ctx, "Reconcile webhook succeed.")
return cerr
}