internal/k8sCommon/k8sclient/replicaset.go (126 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package k8sclient import ( "context" "errors" "fmt" "log" "sync" "time" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon" ) type ReplicaSetClient interface { ReplicaSetToDeployment() map[string]string Init() Shutdown() } type replicaSetClient struct { sync.RWMutex stopChan chan struct{} store *ObjStore inited bool cachedReplicaSetMap map[string]time.Time replicaSetToDeploymentMap map[string]string } func (c *replicaSetClient) ReplicaSetToDeployment() map[string]string { if !c.inited { c.Init() } if c.store.Refreshed() { c.refresh() } c.RLock() defer c.RUnlock() return c.replicaSetToDeploymentMap } func (c *replicaSetClient) refresh() { c.Lock() defer c.Unlock() objsList := c.store.List() tmpMap := make(map[string]string) for _, obj := range objsList { replicaSet := obj.(*replicaSetInfo) ownerLoop: for _, owner := range replicaSet.owners { if owner.kind == containerinsightscommon.Deployment && owner.name != "" { tmpMap[replicaSet.name] = owner.name break ownerLoop } } } if c.replicaSetToDeploymentMap == nil { c.replicaSetToDeploymentMap = make(map[string]string) } if c.cachedReplicaSetMap == nil { c.cachedReplicaSetMap = make(map[string]time.Time) } lastRefreshTime := time.Now() for k, v := range c.cachedReplicaSetMap { if lastRefreshTime.Sub(v) > cacheTTL { delete(c.replicaSetToDeploymentMap, k) delete(c.cachedReplicaSetMap, k) } } for k, v := range tmpMap { c.replicaSetToDeploymentMap[k] = v c.cachedReplicaSetMap[k] = lastRefreshTime } } func (c *replicaSetClient) Init() { c.Lock() defer c.Unlock() if c.inited { return } c.stopChan = make(chan struct{}) c.store = NewObjStore(transformFuncReplicaSet) lw := createReplicaSetListWatch(Get().ClientSet, metav1.NamespaceAll) reflector := cache.NewReflector(lw, &appsv1.ReplicaSet{}, c.store, 0) go reflector.Run(c.stopChan) if err := wait.Poll(50*time.Millisecond, 2*time.Second, func() (done bool, err error) { return reflector.LastSyncResourceVersion() != "", nil }); err != nil { log.Printf("W! ReplicaSet initial sync timeout: %v", err) } c.inited = true } func (c *replicaSetClient) Shutdown() { c.Lock() defer c.Unlock() if !c.inited { return } close(c.stopChan) c.inited = false } func transformFuncReplicaSet(obj interface{}) (interface{}, error) { replicaSet, ok := obj.(*appsv1.ReplicaSet) if !ok { return nil, errors.New(fmt.Sprintf("input obj %v is not ReplicaSet type", obj)) } info := new(replicaSetInfo) info.name = replicaSet.Name info.owners = []*replicaSetOwner{} for _, owner := range replicaSet.OwnerReferences { info.owners = append(info.owners, &replicaSetOwner{kind: owner.Kind, name: owner.Name}) } return info, nil } func createReplicaSetListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher { ctx := context.Background() return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { opts.ResourceVersion = "" // Passing empty context as this was not required by old List() return client.AppsV1().ReplicaSets(ns).List(ctx, opts) }, WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { // Passing empty context as this was not required by old Watch() return client.AppsV1().ReplicaSets(ns).Watch(ctx, opts) }, } }