internal/k8sCommon/k8sclient/endpoint.go (163 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package k8sclient
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
"github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"
"github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sutil"
)
type Service struct {
ServiceName string
Namespace string
}
func NewService(name, namespace string) Service {
return Service{ServiceName: name, Namespace: namespace}
}
type EpClient interface {
PodKeyToServiceNames() map[string][]string
ServiceToPodNum() map[Service]int
Init()
Shutdown()
}
type epClient struct {
sync.RWMutex
stopChan chan struct{}
store *ObjStore
inited bool
podKeyToServiceNamesMap map[string][]string
serviceToPodNumMap map[Service]int //only running pods will show behind endpoints
}
func (c *epClient) PodKeyToServiceNames() map[string][]string {
if !c.inited {
c.Init()
}
if c.store.Refreshed() {
c.refresh()
}
c.RLock()
defer c.RUnlock()
return c.podKeyToServiceNamesMap
}
func (c *epClient) ServiceToPodNum() map[Service]int {
if !c.inited {
c.Init()
}
if c.store.Refreshed() {
c.refresh()
}
c.RLock()
defer c.RUnlock()
return c.serviceToPodNumMap
}
func (c *epClient) refresh() {
c.Lock()
defer c.Unlock()
objsList := c.store.List()
tmpMap := make(map[string]map[string]struct{}) //pod key to service names
serviceToPodNumMapNew := make(map[Service]int)
for _, obj := range objsList {
ep := obj.(*endpointInfo)
serviceName := ep.name
namespace := ep.namespace
// each obj should be a uniq service.
// ignore the service which has 0 pods.
if len(ep.podKeyList) > 0 {
serviceToPodNumMapNew[NewService(serviceName, namespace)] = len(ep.podKeyList)
}
for _, podKey := range ep.podKeyList {
var serviceNamesMap map[string]struct{}
var ok bool
if _, ok = tmpMap[podKey]; !ok {
tmpMap[podKey] = make(map[string]struct{})
}
serviceNamesMap = tmpMap[podKey]
serviceNamesMap[serviceName] = struct{}{}
}
}
podKeyToServiceNamesMapNew := make(map[string][]string)
for podKey, serviceNamesMap := range tmpMap {
serviceNamesList := make([]string, 0, len(serviceNamesMap))
for serviceName := range serviceNamesMap {
serviceNamesList = append(serviceNamesList, serviceName)
}
podKeyToServiceNamesMapNew[podKey] = serviceNamesList
}
c.podKeyToServiceNamesMap = podKeyToServiceNamesMapNew
c.serviceToPodNumMap = serviceToPodNumMapNew
}
func (c *epClient) Init() {
c.Lock()
defer c.Unlock()
if c.inited {
return
}
c.stopChan = make(chan struct{})
c.store = NewObjStore(transformFuncEndpoint)
lw := createEndpointListWatch(Get().ClientSet, metav1.NamespaceAll)
reflector := cache.NewReflector(lw, &v1.Endpoints{}, c.store, 0)
klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(3))
go reflector.Run(c.stopChan)
if err := wait.Poll(50*time.Millisecond, 2*time.Second, func() (done bool, err error) {
return reflector.LastSyncResourceVersion() != "", nil
}); err != nil {
log.Printf("W! Endpoint initial sync timeout: %v", err)
}
c.inited = true
}
func (c *epClient) Shutdown() {
c.Lock()
defer c.Unlock()
if !c.inited {
return
}
close(c.stopChan)
c.inited = false
}
func transformFuncEndpoint(obj interface{}) (interface{}, error) {
endpoint, ok := obj.(*v1.Endpoints)
if !ok {
return nil, errors.New(fmt.Sprintf("input obj %v is not Endpoint type", obj))
}
info := new(endpointInfo)
info.name = endpoint.Name
info.namespace = endpoint.Namespace
info.podKeyList = []string{}
if subsets := endpoint.Subsets; subsets != nil {
for _, subset := range subsets {
if addresses := subset.Addresses; addresses != nil {
for _, address := range addresses {
if targetRef := address.TargetRef; targetRef != nil && targetRef.Kind == containerinsightscommon.TypePod {
podKey := k8sutil.CreatePodKey(targetRef.Namespace, targetRef.Name)
if podKey == "" {
log.Printf("W! Invalid pod metadata, namespace: %s, podName: %s", targetRef.Namespace, targetRef.Name)
continue
}
info.podKeyList = append(info.podKeyList, podKey)
}
}
}
}
}
return info, nil
}
func createEndpointListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher {
ctx := context.Background()
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Endpoints(ns).List(ctx, opts)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Endpoints(ns).Watch(ctx, opts)
},
}
}