pkg/authority/k8s/client.go (329 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package k8s
import (
"context"
"flag"
"os"
"path/filepath"
"reflect"
"strings"
"time"
"github.com/apache/dubbo-admin/pkg/authority/cert"
"github.com/apache/dubbo-admin/pkg/authority/config"
infoemerclient "github.com/apache/dubbo-admin/pkg/authority/generated/clientset/versioned"
informers "github.com/apache/dubbo-admin/pkg/authority/generated/informers/externalversions"
"github.com/apache/dubbo-admin/pkg/authority/rule"
"github.com/apache/dubbo-admin/pkg/authority/rule/authentication"
"github.com/apache/dubbo-admin/pkg/authority/rule/authorization"
"github.com/apache/dubbo-admin/pkg/logger"
admissionregistrationV1 "k8s.io/api/admissionregistration/v1"
k8sauth "k8s.io/api/authentication/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
var kubeconfig string
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "",
"Paths to a kubeconfig. Only required if out-of-cluster.")
}
type Client interface {
Init(options *config.Options) bool
GetAuthorityCert(namespace string) (string, string)
UpdateAuthorityCert(cert string, pri string, namespace string)
UpdateAuthorityPublicKey(cert string) bool
VerifyServiceAccount(token string, authorizationType string) (*rule.Endpoint, bool)
UpdateWebhookConfig(options *config.Options, storage cert.Storage)
GetNamespaceLabels(namespace string) map[string]string
InitController(paHandler authentication.Handler, apHandler authorization.Handler)
GetKubClient() *kubernetes.Clientset
}
type ClientImpl struct {
options *config.Options
kubeClient *kubernetes.Clientset
informerClient *infoemerclient.Clientset
}
func NewClient() Client {
return &ClientImpl{}
}
func (c *ClientImpl) Init(options *config.Options) bool {
c.options = options
config, err := rest.InClusterConfig()
options.InPodEnv = err == nil
if err != nil {
logger.Sugar().Infof("Failed to load config from Pod. Will fall back to kube config file.")
// Read kubeconfig from command line
if len(kubeconfig) <= 0 {
// Read kubeconfig from env
kubeconfig = os.Getenv(clientcmd.RecommendedConfigPathEnvVar)
if len(kubeconfig) <= 0 {
// Read kubeconfig from home dir
if home := homedir.HomeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
}
}
}
// use the current context in kubeconfig
logger.Sugar().Infof("Read kubeconfig from %s", kubeconfig)
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
logger.Sugar().Warnf("Failed to load config from kube config file.")
return false
}
}
// set qps and burst for rest config
config.QPS = float32(c.options.RestConfigQps)
config.Burst = c.options.RestConfigBurst
// creates the clientset
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
logger.Sugar().Warnf("Failed to create client to kubernetes. " + err.Error())
return false
}
informerClient, err := infoemerclient.NewForConfig(config)
if err != nil {
logger.Sugar().Warnf("Failed to create client to kubernetes. " + err.Error())
return false
}
c.kubeClient = clientSet
c.informerClient = informerClient
return true
}
func (c *ClientImpl) GetAuthorityCert(namespace string) (string, string) {
s, err := c.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), "dubbo-ca-secret", metav1.GetOptions{})
if err != nil {
logger.Sugar().Warnf("Unable to get authority cert secret from kubernetes. " + err.Error())
}
return string(s.Data["cert.pem"]), string(s.Data["pri.pem"])
}
func (c *ClientImpl) UpdateAuthorityCert(cert string, pri string, namespace string) {
s, err := c.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), "dubbo-ca-secret", metav1.GetOptions{})
if err != nil {
logger.Sugar().Warnf("Unable to get ca secret from kubernetes. Will try to create. " + err.Error())
s = &v1.Secret{
Data: map[string][]byte{
"cert.pem": []byte(cert),
"pri.pem": []byte(pri),
},
}
s.Name = "dubbo-ca-secret"
_, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), s, metav1.CreateOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to create ca secret to kubernetes. " + err.Error())
} else {
logger.Sugar().Info("Create ca secret to kubernetes success. ")
}
}
if string(s.Data["cert.pem"]) == cert && string(s.Data["pri.pem"]) == pri {
logger.Sugar().Info("Ca secret in kubernetes is already the newest vesion.")
return
}
s.Data["cert.pem"] = []byte(cert)
s.Data["pri.pem"] = []byte(pri)
_, err = c.kubeClient.CoreV1().Secrets(namespace).Update(context.TODO(), s, metav1.UpdateOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to update ca secret to kubernetes. " + err.Error())
} else {
logger.Sugar().Info("Update ca secret to kubernetes success. ")
}
}
func (c *ClientImpl) UpdateAuthorityPublicKey(cert string) bool {
ns, err := c.kubeClient.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to get namespaces. " + err.Error())
return false
}
for _, n := range ns.Items {
if n.Name == "kube-system" {
continue
}
cm, err := c.kubeClient.CoreV1().ConfigMaps(n.Name).Get(context.TODO(), "dubbo-ca-cert", metav1.GetOptions{})
if err != nil {
logger.Sugar().Warnf("Unable to find dubbo-ca-cert in " + n.Name + ". Will create config map. " + err.Error())
cm = &v1.ConfigMap{
Data: map[string]string{
"ca.crt": cert,
},
}
cm.Name = "dubbo-ca-cert"
_, err = c.kubeClient.CoreV1().ConfigMaps(n.Name).Create(context.TODO(), cm, metav1.CreateOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to create config map for " + n.Name + ". " + err.Error())
return false
} else {
logger.Sugar().Info("Create ca config map for " + n.Name + " success.")
}
}
if cm.Data["ca.crt"] == cert {
logger.Sugar().Info("Ignore override ca to " + n.Name + ". Cause: Already exist.")
continue
}
cm.Data["ca.crt"] = cert
_, err = c.kubeClient.CoreV1().ConfigMaps(n.Name).Update(context.TODO(), cm, metav1.UpdateOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to update config map for " + n.Name + ". " + err.Error())
return false
} else {
logger.Sugar().Info("Update ca config map for " + n.Name + " success.")
}
}
return true
}
func (c *ClientImpl) GetNamespaceLabels(namespace string) map[string]string {
ns, err := c.kubeClient.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to validate token. " + err.Error())
return map[string]string{}
}
if ns.Labels != nil {
return ns.Labels
}
return map[string]string{}
}
func (c *ClientImpl) VerifyServiceAccount(token string, authorizationType string) (*rule.Endpoint, bool) {
var tokenReview *k8sauth.TokenReview
if authorizationType == "dubbo-ca-token" {
tokenReview = &k8sauth.TokenReview{
Spec: k8sauth.TokenReviewSpec{
Token: token,
Audiences: []string{"dubbo-ca"},
},
}
} else {
tokenReview = &k8sauth.TokenReview{
Spec: k8sauth.TokenReviewSpec{
Token: token,
},
}
}
reviewRes, err := c.kubeClient.AuthenticationV1().TokenReviews().Create(
context.TODO(), tokenReview, metav1.CreateOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to validate token. " + err.Error())
return nil, false
}
if reviewRes.Status.Error != "" {
logger.Sugar().Warnf("Failed to validate token. " + reviewRes.Status.Error)
return nil, false
}
names := strings.Split(reviewRes.Status.User.Username, ":")
if len(names) != 4 {
logger.Sugar().Warnf("Token is not a pod service account. " + reviewRes.Status.User.Username)
return nil, false
}
namespace := names[2]
podName := reviewRes.Status.User.Extra["authentication.kubernetes.io/pod-name"]
podUid := reviewRes.Status.User.Extra["authentication.kubernetes.io/pod-uid"]
if len(podName) != 1 || len(podUid) != 1 {
logger.Sugar().Warnf("Token is not a pod service account. " + reviewRes.Status.User.Username)
return nil, false
}
pod, err := c.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), podName[0], metav1.GetOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to get pod. " + err.Error())
return nil, false
}
if pod.UID != types.UID(podUid[0]) {
logger.Sugar().Warnf("Token is not a pod service account. " + reviewRes.Status.User.Username)
return nil, false
}
e := &rule.Endpoint{}
e.ID = string(pod.UID)
for _, i := range pod.Status.PodIPs {
if i.IP != "" {
e.Ips = append(e.Ips, i.IP)
}
}
e.SpiffeID = "spiffe://cluster.local/ns/" + pod.Namespace + "/sa/" + pod.Spec.ServiceAccountName
if strings.HasPrefix(reviewRes.Status.User.Username, "system:serviceaccount:") {
names := strings.Split(reviewRes.Status.User.Username, ":")
if len(names) == 4 {
e.SpiffeID = "spiffe://cluster.local/ns/" + names[2] + "/sa/" + names[3]
}
}
e.KubernetesEnv = &rule.KubernetesEnv{
Namespace: pod.Namespace,
PodName: pod.Name,
PodLabels: pod.Labels,
PodAnnotations: pod.Annotations,
}
return e, true
}
func (c *ClientImpl) UpdateWebhookConfig(options *config.Options, storage cert.Storage) {
path := "/mutating-services"
failurePolicy := admissionregistrationV1.Ignore
sideEffects := admissionregistrationV1.SideEffectClassNone
bundle := storage.GetAuthorityCert().CertPem
mwConfig, err := c.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(context.TODO(), "dubbo-ca", metav1.GetOptions{})
if err != nil {
logger.Sugar().Warnf("Unable to find dubbo-ca webhook config. Will create. " + err.Error())
mwConfig = &admissionregistrationV1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: "dubbo-ca",
},
Webhooks: []admissionregistrationV1.MutatingWebhook{
{
Name: "dubbo-ca" + ".k8s.io",
ClientConfig: admissionregistrationV1.WebhookClientConfig{
Service: &admissionregistrationV1.ServiceReference{
Name: options.ServiceName,
Namespace: options.Namespace,
Port: &options.WebhookPort,
Path: &path,
},
CABundle: []byte(bundle),
},
FailurePolicy: &failurePolicy,
Rules: []admissionregistrationV1.RuleWithOperations{
{
Operations: []admissionregistrationV1.OperationType{
admissionregistrationV1.Create,
},
Rule: admissionregistrationV1.Rule{
APIGroups: []string{""},
APIVersions: []string{"v1"},
Resources: []string{"pods"},
},
},
},
//NamespaceSelector: &metav1.LabelSelector{
// MatchLabels: map[string]string{
// "dubbo-injection": "enabled",
// },
//},
//ObjectSelector: &metav1.LabelSelector{
// MatchLabels: map[string]string{
// "dubbo-injection": "enabled",
// },
//},
SideEffects: &sideEffects,
AdmissionReviewVersions: []string{"v1"},
},
},
}
_, err := c.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(context.TODO(), mwConfig, metav1.CreateOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to create webhook config. " + err.Error())
} else {
logger.Sugar().Info("Create webhook config success.")
}
return
}
if reflect.DeepEqual(mwConfig.Webhooks[0].ClientConfig.CABundle, []byte(bundle)) {
logger.Sugar().Info("Ignore override webhook config. Cause: Already exist.")
return
}
mwConfig.Webhooks[0].ClientConfig.CABundle = []byte(bundle)
_, err = c.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Update(context.TODO(), mwConfig, metav1.UpdateOptions{})
if err != nil {
logger.Sugar().Warnf("Failed to update webhook config. " + err.Error())
} else {
logger.Sugar().Info("Update webhook config success.")
}
}
func (c *ClientImpl) InitController(
authenticationHandler authentication.Handler,
authorizationHandler authorization.Handler,
) {
logger.Sugar().Info("Init rule controller...")
informerFactory := informers.NewSharedInformerFactory(c.informerClient, time.Second*30)
stopCh := make(chan struct{})
controller := NewController(c.informerClient,
c.options.Namespace,
authenticationHandler,
authorizationHandler,
informerFactory.Dubbo().V1beta1().AuthenticationPolicies(),
informerFactory.Dubbo().V1beta1().AuthorizationPolicies())
informerFactory.Start(stopCh)
controller.WaitSynced()
}
func (c *ClientImpl) GetKubClient() *kubernetes.Clientset {
return c.kubeClient
}