pkg/kube/multicluster/secretcontroller.go (476 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package multicluster import ( "bytes" "context" "crypto/sha256" "errors" "fmt" "sync" "time" ) import ( "github.com/hashicorp/go-multierror" "go.uber.org/atomic" "istio.io/pkg/log" "istio.io/pkg/monitoring" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd/api" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" "github.com/apache/dubbo-go-pixiu/pkg/cluster" "github.com/apache/dubbo-go-pixiu/pkg/kube" "github.com/apache/dubbo-go-pixiu/pkg/kube/controllers" "github.com/apache/dubbo-go-pixiu/pkg/util/sets" ) const ( MultiClusterSecretLabel = "istio/multiCluster" ) func init() { monitoring.MustRegister(timeouts) monitoring.MustRegister(clustersCount) } var ( timeouts = monitoring.NewSum( "remote_cluster_sync_timeouts_total", "Number of times remote clusters took too long to sync, causing slow startup that excludes remote clusters.", ) clusterType = monitoring.MustCreateLabel("cluster_type") clustersCount = monitoring.NewGauge( "istiod_managed_clusters", "Number of clusters managed by istiod", monitoring.WithLabels(clusterType), ) localClusters = clustersCount.With(clusterType.Value("local")) remoteClusters = clustersCount.With(clusterType.Value("remote")) ) type ClusterHandler interface { ClusterAdded(cluster *Cluster, stop <-chan struct{}) error ClusterUpdated(cluster *Cluster, stop <-chan struct{}) error ClusterDeleted(clusterID cluster.ID) error } // Controller is the controller implementation for Secret resources type Controller struct { namespace string localClusterID cluster.ID localClusterClient kube.Client queue controllers.Queue informer cache.SharedIndexInformer cs *ClusterStore handlers []ClusterHandler once sync.Once syncInterval time.Duration remoteSyncTimeout atomic.Bool } // Cluster defines cluster struct type Cluster struct { // ID of the cluster. ID cluster.ID // SyncTimeout is marked after features.RemoteClusterTimeout. SyncTimeout *atomic.Bool // Client for accessing the cluster. Client kube.Client kubeConfigSha [sha256.Size]byte stop chan struct{} // initialSync is marked when RunAndWait completes initialSync *atomic.Bool } // Stop channel which is closed when the cluster is removed or the Controller that created the client is stopped. // Client.RunAndWait is called using this channel. func (r *Cluster) Stop() <-chan struct{} { return r.stop } func (c *Controller) AddHandler(h ClusterHandler) { log.Infof("handling remote clusters in %T", h) c.handlers = append(c.handlers, h) } // Run starts the cluster's informers and waits for caches to sync. Once caches are synced, we mark the cluster synced. // This should be called after each of the handlers have registered informers, and should be run in a goroutine. func (r *Cluster) Run() { r.Client.RunAndWait(r.Stop()) r.initialSync.Store(true) } func (r *Cluster) HasSynced() bool { return r.initialSync.Load() || r.SyncTimeout.Load() } func (r *Cluster) SyncDidTimeout() bool { return r.SyncTimeout.Load() && !r.HasSynced() } // ClusterStore is a collection of clusters type ClusterStore struct { sync.RWMutex // keyed by secret key(ns/name)->clusterID remoteClusters map[string]map[cluster.ID]*Cluster clusters sets.Set } // newClustersStore initializes data struct to store clusters information func newClustersStore() *ClusterStore { return &ClusterStore{ remoteClusters: make(map[string]map[cluster.ID]*Cluster), clusters: sets.New(), } } func (c *ClusterStore) Store(secretKey string, clusterID cluster.ID, value *Cluster) { c.Lock() defer c.Unlock() if _, ok := c.remoteClusters[secretKey]; !ok { c.remoteClusters[secretKey] = make(map[cluster.ID]*Cluster) } c.remoteClusters[secretKey][clusterID] = value c.clusters.Insert(string(clusterID)) } func (c *ClusterStore) Delete(secretKey string, clusterID cluster.ID) { c.Lock() defer c.Unlock() delete(c.remoteClusters[secretKey], clusterID) c.clusters.Delete(string(clusterID)) if len(c.remoteClusters[secretKey]) == 0 { delete(c.remoteClusters, secretKey) } } func (c *ClusterStore) Get(secretKey string, clusterID cluster.ID) *Cluster { c.RLock() defer c.RUnlock() if _, ok := c.remoteClusters[secretKey]; !ok { return nil } return c.remoteClusters[secretKey][clusterID] } func (c *ClusterStore) Contains(clusterID cluster.ID) bool { c.RLock() defer c.RUnlock() return c.clusters.Contains(string(clusterID)) } func (c *ClusterStore) GetByID(clusterID cluster.ID) *Cluster { c.RLock() defer c.RUnlock() for _, clusters := range c.remoteClusters { c, ok := clusters[clusterID] if ok { return c } } return nil } // All returns a copy of the current remote clusters. func (c *ClusterStore) All() map[string]map[cluster.ID]*Cluster { if c == nil { return nil } c.RLock() defer c.RUnlock() out := make(map[string]map[cluster.ID]*Cluster, len(c.remoteClusters)) for secret, clusters := range c.remoteClusters { out[secret] = make(map[cluster.ID]*Cluster, len(clusters)) for cid, c := range clusters { outCluster := *c out[secret][cid] = &outCluster } } return out } // GetExistingClustersFor return existing clusters registered for the given secret func (c *ClusterStore) GetExistingClustersFor(secretKey string) []*Cluster { c.RLock() defer c.RUnlock() out := make([]*Cluster, 0, len(c.remoteClusters[secretKey])) for _, cluster := range c.remoteClusters[secretKey] { out = append(out, cluster) } return out } func (c *ClusterStore) Len() int { c.Lock() defer c.Unlock() out := 0 for _, clusterMap := range c.remoteClusters { out += len(clusterMap) } return out } // NewController returns a new secret controller func NewController(kubeclientset kube.Client, namespace string, localClusterID cluster.ID) *Controller { secretsInformer := cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { opts.LabelSelector = MultiClusterSecretLabel + "=true" return kubeclientset.CoreV1().Secrets(namespace).List(context.TODO(), opts) }, WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { opts.LabelSelector = MultiClusterSecretLabel + "=true" return kubeclientset.CoreV1().Secrets(namespace).Watch(context.TODO(), opts) }, }, &corev1.Secret{}, 0, cache.Indexers{}, ) // init gauges localClusters.Record(1.0) remoteClusters.Record(0.0) controller := &Controller{ namespace: namespace, localClusterID: localClusterID, localClusterClient: kubeclientset, cs: newClustersStore(), informer: secretsInformer, syncInterval: 100 * time.Millisecond, } controller.queue = controllers.NewQueue("multicluster secret", controllers.WithReconciler(controller.processItem)) secretsInformer.AddEventHandler(controllers.ObjectHandler(controller.queue.AddObject)) return controller } // Run starts the controller until it receives a message over stopCh func (c *Controller) Run(stopCh <-chan struct{}) error { // run handlers for the local cluster; do not store this *Cluster in the ClusterStore or give it a SyncTimeout // this is done outside the goroutine, we should block other Run/startFuncs until this is registered localCluster := &Cluster{Client: c.localClusterClient, ID: c.localClusterID} if err := c.handleAdd(localCluster, stopCh); err != nil { return fmt.Errorf("failed initializing local cluster %s: %v", c.localClusterID, err) } go func() { t0 := time.Now() log.Info("Starting multicluster remote secrets controller") go c.informer.Run(stopCh) if !kube.WaitForCacheSyncInterval(stopCh, c.syncInterval, c.informer.HasSynced) { log.Error("Failed to sync multicluster remote secrets controller cache") return } log.Infof("multicluster remote secrets controller cache synced in %v", time.Since(t0)) if features.RemoteClusterTimeout != 0 { time.AfterFunc(features.RemoteClusterTimeout, func() { c.remoteSyncTimeout.Store(true) }) } c.queue.Run(stopCh) }() return nil } func (c *Controller) hasSynced() bool { if !c.queue.HasSynced() { log.Debug("secret controller did not sync secrets presented at startup") // we haven't finished processing the secrets that were present at startup return false } c.cs.RLock() defer c.cs.RUnlock() for _, clusterMap := range c.cs.remoteClusters { for _, cluster := range clusterMap { if !cluster.HasSynced() { log.Debugf("remote cluster %s registered informers have not been synced up yet", cluster.ID) return false } } } return true } func (c *Controller) HasSynced() bool { synced := c.hasSynced() if synced { return true } if c.remoteSyncTimeout.Load() { c.once.Do(func() { log.Errorf("remote clusters failed to sync after %v", features.RemoteClusterTimeout) timeouts.Increment() }) return true } return synced } func (c *Controller) processItem(key types.NamespacedName) error { log.Infof("processing secret event for secret %s", key) obj, exists, err := c.informer.GetIndexer().GetByKey(key.String()) if err != nil { return fmt.Errorf("error fetching object %s error: %v", key, err) } if exists { log.Debugf("secret %s exists in informer cache, processing it", key) c.addSecret(key, obj.(*corev1.Secret)) } else { log.Debugf("secret %s does not exist in informer cache, deleting it", key) c.deleteSecret(key.String()) } remoteClusters.Record(float64(c.cs.Len())) return nil } // BuildClientsFromConfig creates kube.Clients from the provided kubeconfig. This is overridden for testing only var BuildClientsFromConfig = func(kubeConfig []byte) (kube.Client, error) { if len(kubeConfig) == 0 { return nil, errors.New("kubeconfig is empty") } rawConfig, err := clientcmd.Load(kubeConfig) if err != nil { return nil, fmt.Errorf("kubeconfig cannot be loaded: %v", err) } if err := clientcmd.Validate(*rawConfig); err != nil { return nil, fmt.Errorf("kubeconfig is not valid: %v", err) } if err := sanitizeKubeConfig(*rawConfig, features.InsecureKubeConfigOptions); err != nil { return nil, fmt.Errorf("kubeconfig is not allowed: %v", err) } clientConfig := clientcmd.NewDefaultClientConfig(*rawConfig, &clientcmd.ConfigOverrides{}) clients, err := kube.NewClient(clientConfig) if err != nil { return nil, fmt.Errorf("failed to create kube clients: %v", err) } return clients, nil } // sanitizeKubeConfig sanitizes a kubeconfig file to strip out insecure settings which may leak // confidential materials. // See https://github.com/kubernetes/kubectl/issues/697 func sanitizeKubeConfig(config api.Config, allowlist sets.Set) error { for k, auths := range config.AuthInfos { if ap := auths.AuthProvider; ap != nil { // We currently are importing 5 authenticators: gcp, azure, exec, and openstack switch ap.Name { case "oidc": // OIDC is safe as it doesn't read files or execute code. // create-remote-secret specifically supports OIDC so its probably important to not break this. default: if !allowlist.Contains(ap.Name) { // All the others - gcp, azure, exec, and openstack - are unsafe return fmt.Errorf("auth provider %s is not allowed", ap.Name) } } } if auths.ClientKey != "" && !allowlist.Contains("clientKey") { return fmt.Errorf("clientKey is not allowed") } if auths.ClientCertificate != "" && !allowlist.Contains("clientCertificate") { return fmt.Errorf("clientCertificate is not allowed") } if auths.TokenFile != "" && !allowlist.Contains("tokenFile") { return fmt.Errorf("tokenFile is not allowed") } if auths.Exec != nil && !allowlist.Contains("exec") { return fmt.Errorf("exec is not allowed") } // Reconstruct the AuthInfo so if a new field is added we will not include it without review config.AuthInfos[k] = &api.AuthInfo{ // LocationOfOrigin: Not needed ClientCertificate: auths.ClientCertificate, ClientCertificateData: auths.ClientCertificateData, ClientKey: auths.ClientKey, ClientKeyData: auths.ClientKeyData, Token: auths.Token, TokenFile: auths.TokenFile, Impersonate: auths.Impersonate, ImpersonateGroups: auths.ImpersonateGroups, ImpersonateUserExtra: auths.ImpersonateUserExtra, Username: auths.Username, Password: auths.Password, AuthProvider: auths.AuthProvider, // Included because it is sanitized above Exec: auths.Exec, // Extensions: Not needed, } // Other relevant fields that are not acted on: // * Cluster.Server (and ProxyURL). This allows the user to send requests to arbitrary URLs, enabling potential SSRF attacks. // However, we don't actually know what valid URLs are, so we cannot reasonably constrain this. Instead, // we try to limit what confidential information could be exfiltrated (from AuthInfo). Additionally, the user cannot control // the paths we send requests to, limiting potential attack scope. // * Cluster.CertificateAuthority. While this reads from files, the result is not attached to the request and is instead // entirely local } return nil } func (c *Controller) createRemoteCluster(kubeConfig []byte, clusterID string) (*Cluster, error) { clients, err := BuildClientsFromConfig(kubeConfig) if err != nil { return nil, err } return &Cluster{ ID: cluster.ID(clusterID), Client: clients, stop: make(chan struct{}), // for use inside the package, to close on cleanup initialSync: atomic.NewBool(false), SyncTimeout: &c.remoteSyncTimeout, kubeConfigSha: sha256.Sum256(kubeConfig), }, nil } func (c *Controller) addSecret(name types.NamespacedName, s *corev1.Secret) { secretKey := name.String() // First delete clusters existingClusters := c.cs.GetExistingClustersFor(secretKey) for _, existingCluster := range existingClusters { if _, ok := s.Data[string(existingCluster.ID)]; !ok { c.deleteCluster(secretKey, existingCluster.ID) } } for clusterID, kubeConfig := range s.Data { if cluster.ID(clusterID) == c.localClusterID { log.Infof("ignoring cluster %v from secret %v as it would overwrite the local cluster", clusterID, secretKey) continue } action, callback := "Adding", c.handleAdd if prev := c.cs.Get(secretKey, cluster.ID(clusterID)); prev != nil { action, callback = "Updating", c.handleUpdate // clusterID must be unique even across multiple secrets kubeConfigSha := sha256.Sum256(kubeConfig) if bytes.Equal(kubeConfigSha[:], prev.kubeConfigSha[:]) { log.Infof("skipping update of cluster_id=%v from secret=%v: (kubeconfig are identical)", clusterID, secretKey) continue } // stop previous remote cluster prev.Stop() } else if c.cs.Contains(cluster.ID(clusterID)) { // if the cluster has been registered before by another secret, ignore the new one. log.Warnf("cluster %d from secret %s has already been registered", clusterID, secretKey) continue } log.Infof("%s cluster %v from secret %v", action, clusterID, secretKey) remoteCluster, err := c.createRemoteCluster(kubeConfig, clusterID) if err != nil { log.Errorf("%s cluster_id=%v from secret=%v: %v", action, clusterID, secretKey, err) continue } c.cs.Store(secretKey, remoteCluster.ID, remoteCluster) if err := callback(remoteCluster, remoteCluster.stop); err != nil { log.Errorf("%s cluster_id from secret=%v: %s %v", action, clusterID, secretKey, err) continue } log.Infof("finished callback for %s and starting to sync", clusterID) go remoteCluster.Run() } log.Infof("Number of remote clusters: %d", c.cs.Len()) } func (c *Controller) deleteSecret(secretKey string) { for _, cluster := range c.cs.GetExistingClustersFor(secretKey) { if cluster.ID == c.localClusterID { log.Infof("ignoring delete cluster %v from secret %v as it would overwrite the local cluster", c.localClusterID, secretKey) continue } log.Infof("Deleting cluster_id=%v configured by secret=%v", cluster.ID, secretKey) close(cluster.stop) err := c.handleDelete(cluster.ID) if err != nil { log.Errorf("Error removing cluster_id=%v configured by secret=%v: %v", cluster.ID, secretKey, err) } c.cs.Delete(secretKey, cluster.ID) } log.Infof("Number of remote clusters: %d", c.cs.Len()) } func (c *Controller) deleteCluster(secretKey string, clusterID cluster.ID) { c.cs.Lock() defer func() { c.cs.Unlock() log.Infof("Number of remote clusters: %d", c.cs.Len()) }() log.Infof("Deleting cluster_id=%v configured by secret=%v", clusterID, secretKey) close(c.cs.remoteClusters[secretKey][clusterID].stop) err := c.handleDelete(clusterID) if err != nil { log.Errorf("Error removing cluster_id=%v configured by secret=%v: %v", clusterID, secretKey, err) } delete(c.cs.remoteClusters[secretKey], clusterID) } func (c *Controller) handleAdd(cluster *Cluster, stop <-chan struct{}) error { var errs *multierror.Error for _, handler := range c.handlers { errs = multierror.Append(errs, handler.ClusterAdded(cluster, stop)) } return errs.ErrorOrNil() } func (c *Controller) handleUpdate(cluster *Cluster, stop <-chan struct{}) error { var errs *multierror.Error for _, handler := range c.handlers { errs = multierror.Append(errs, handler.ClusterUpdated(cluster, stop)) } return errs.ErrorOrNil() } func (c *Controller) handleDelete(key cluster.ID) error { var errs *multierror.Error for _, handler := range c.handlers { errs = multierror.Append(errs, handler.ClusterDeleted(key)) } return errs.ErrorOrNil() } // ListRemoteClusters provides debug info about connected remote clusters. func (c *Controller) ListRemoteClusters() []cluster.DebugInfo { var out []cluster.DebugInfo for secretName, clusters := range c.cs.All() { for clusterID, c := range clusters { syncStatus := "syncing" if c.HasSynced() { syncStatus = "synced" } else if c.SyncDidTimeout() { syncStatus = "timeout" } out = append(out, cluster.DebugInfo{ ID: clusterID, SecretName: secretName, SyncStatus: syncStatus, }) } } return out } func (c *Controller) GetRemoteKubeClient(clusterID cluster.ID) kubernetes.Interface { if remoteCluster := c.cs.GetByID(clusterID); remoteCluster != nil { return remoteCluster.Client } return nil }