internal/k8sCommon/k8sclient/replicaset.go (126 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package k8sclient
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"
)
type ReplicaSetClient interface {
ReplicaSetToDeployment() map[string]string
Init()
Shutdown()
}
type replicaSetClient struct {
sync.RWMutex
stopChan chan struct{}
store *ObjStore
inited bool
cachedReplicaSetMap map[string]time.Time
replicaSetToDeploymentMap map[string]string
}
func (c *replicaSetClient) ReplicaSetToDeployment() map[string]string {
if !c.inited {
c.Init()
}
if c.store.Refreshed() {
c.refresh()
}
c.RLock()
defer c.RUnlock()
return c.replicaSetToDeploymentMap
}
func (c *replicaSetClient) refresh() {
c.Lock()
defer c.Unlock()
objsList := c.store.List()
tmpMap := make(map[string]string)
for _, obj := range objsList {
replicaSet := obj.(*replicaSetInfo)
ownerLoop:
for _, owner := range replicaSet.owners {
if owner.kind == containerinsightscommon.Deployment && owner.name != "" {
tmpMap[replicaSet.name] = owner.name
break ownerLoop
}
}
}
if c.replicaSetToDeploymentMap == nil {
c.replicaSetToDeploymentMap = make(map[string]string)
}
if c.cachedReplicaSetMap == nil {
c.cachedReplicaSetMap = make(map[string]time.Time)
}
lastRefreshTime := time.Now()
for k, v := range c.cachedReplicaSetMap {
if lastRefreshTime.Sub(v) > cacheTTL {
delete(c.replicaSetToDeploymentMap, k)
delete(c.cachedReplicaSetMap, k)
}
}
for k, v := range tmpMap {
c.replicaSetToDeploymentMap[k] = v
c.cachedReplicaSetMap[k] = lastRefreshTime
}
}
func (c *replicaSetClient) Init() {
c.Lock()
defer c.Unlock()
if c.inited {
return
}
c.stopChan = make(chan struct{})
c.store = NewObjStore(transformFuncReplicaSet)
lw := createReplicaSetListWatch(Get().ClientSet, metav1.NamespaceAll)
reflector := cache.NewReflector(lw, &appsv1.ReplicaSet{}, c.store, 0)
go reflector.Run(c.stopChan)
if err := wait.Poll(50*time.Millisecond, 2*time.Second, func() (done bool, err error) {
return reflector.LastSyncResourceVersion() != "", nil
}); err != nil {
log.Printf("W! ReplicaSet initial sync timeout: %v", err)
}
c.inited = true
}
func (c *replicaSetClient) Shutdown() {
c.Lock()
defer c.Unlock()
if !c.inited {
return
}
close(c.stopChan)
c.inited = false
}
func transformFuncReplicaSet(obj interface{}) (interface{}, error) {
replicaSet, ok := obj.(*appsv1.ReplicaSet)
if !ok {
return nil, errors.New(fmt.Sprintf("input obj %v is not ReplicaSet type", obj))
}
info := new(replicaSetInfo)
info.name = replicaSet.Name
info.owners = []*replicaSetOwner{}
for _, owner := range replicaSet.OwnerReferences {
info.owners = append(info.owners, &replicaSetOwner{kind: owner.Kind, name: owner.Name})
}
return info, nil
}
func createReplicaSetListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher {
ctx := context.Background()
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.ResourceVersion = ""
// Passing empty context as this was not required by old List()
return client.AppsV1().ReplicaSets(ns).List(ctx, opts)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Passing empty context as this was not required by old Watch()
return client.AppsV1().ReplicaSets(ns).Watch(ctx, opts)
},
}
}