pkg/helper/k8smeta/k8s_meta_cache.go (293 lines of code) (raw):
package k8smeta
import (
"context"
"fmt"
"time"
app "k8s.io/api/apps/v1"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
storage "k8s.io/api/storage/v1"
meta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"github.com/alibaba/ilogtail/pkg/logger"
)
const hostIPIndexPrefix = "host/"
type k8sMetaCache struct {
metaStore *DeferredDeletionMetaStore
clientset *kubernetes.Clientset
eventCh chan *K8sMetaEvent
stopCh chan struct{}
resourceType string
schema *runtime.Scheme
}
func newK8sMetaCache(stopCh chan struct{}, resourceType string) *k8sMetaCache {
idxRules := getIdxRules(resourceType)
m := &k8sMetaCache{}
m.eventCh = make(chan *K8sMetaEvent, 100)
m.stopCh = stopCh
m.metaStore = NewDeferredDeletionMetaStore(m.eventCh, m.stopCh, 120, cache.MetaNamespaceKeyFunc, idxRules...)
m.resourceType = resourceType
m.schema = runtime.NewScheme()
_ = v1.AddToScheme(m.schema)
_ = batch.AddToScheme(m.schema)
_ = app.AddToScheme(m.schema)
_ = networking.AddToScheme(m.schema)
_ = storage.AddToScheme(m.schema)
return m
}
func (m *k8sMetaCache) init(clientset *kubernetes.Clientset) {
m.clientset = clientset
m.metaStore.Start()
m.watch(m.stopCh)
}
func (m *k8sMetaCache) Get(key []string) map[string][]*ObjectWrapper {
return m.metaStore.Get(key)
}
func (m *k8sMetaCache) GetSize() int {
return len(m.metaStore.Items)
}
func (m *k8sMetaCache) GetQueueSize() int {
return len(m.eventCh)
}
func (m *k8sMetaCache) List() []*ObjectWrapper {
return m.metaStore.List()
}
func (m *k8sMetaCache) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper {
return m.metaStore.Filter(filterFunc, limit)
}
func (m *k8sMetaCache) RegisterSendFunc(key string, sendFunc SendFunc, interval int) {
m.metaStore.RegisterSendFunc(key, sendFunc, interval)
logger.Debug(context.Background(), "register send func", m.resourceType)
}
func (m *k8sMetaCache) UnRegisterSendFunc(key string) {
m.metaStore.UnRegisterSendFunc(key)
}
func (m *k8sMetaCache) watch(stopCh <-chan struct{}) {
factory, informer := m.getFactoryInformer()
if informer == nil {
return
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
nowTime := time.Now().Unix()
m.eventCh <- &K8sMetaEvent{
EventType: EventTypeAdd,
Object: &ObjectWrapper{
ResourceType: m.resourceType,
Raw: m.preProcess(obj),
FirstObservedTime: nowTime,
LastObservedTime: nowTime,
},
}
metaManager.addEventCount.Add(1)
},
UpdateFunc: func(oldObj interface{}, obj interface{}) {
nowTime := time.Now().Unix()
m.eventCh <- &K8sMetaEvent{
EventType: EventTypeUpdate,
Object: &ObjectWrapper{
ResourceType: m.resourceType,
Raw: m.preProcess(obj),
FirstObservedTime: nowTime,
LastObservedTime: nowTime,
},
}
metaManager.updateEventCount.Add(1)
},
DeleteFunc: func(obj interface{}) {
m.eventCh <- &K8sMetaEvent{
EventType: EventTypeDelete,
Object: &ObjectWrapper{
ResourceType: m.resourceType,
Raw: m.preProcess(obj),
LastObservedTime: time.Now().Unix(),
},
}
metaManager.deleteEventCount.Add(1)
},
})
go factory.Start(stopCh)
// wait infinite for first cache sync success
for {
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
logger.Error(context.Background(), "K8S_META_CACHE_SYNC_TIMEOUT", "service cache sync timeout")
time.Sleep(1 * time.Second)
} else {
break
}
}
}
func (m *k8sMetaCache) getFactoryInformer() (informers.SharedInformerFactory, cache.SharedIndexInformer) {
var factory informers.SharedInformerFactory
switch m.resourceType {
case POD:
factory = informers.NewSharedInformerFactory(m.clientset, time.Hour*24)
default:
factory = informers.NewSharedInformerFactory(m.clientset, time.Hour*1)
}
var informer cache.SharedIndexInformer
switch m.resourceType {
case POD:
informer = factory.Core().V1().Pods().Informer()
case SERVICE:
informer = factory.Core().V1().Services().Informer()
case DEPLOYMENT:
informer = factory.Apps().V1().Deployments().Informer()
case REPLICASET:
informer = factory.Apps().V1().ReplicaSets().Informer()
case STATEFULSET:
informer = factory.Apps().V1().StatefulSets().Informer()
case DAEMONSET:
informer = factory.Apps().V1().DaemonSets().Informer()
case CRONJOB:
informer = factory.Batch().V1().CronJobs().Informer()
case JOB:
informer = factory.Batch().V1().Jobs().Informer()
case NODE:
informer = factory.Core().V1().Nodes().Informer()
case NAMESPACE:
informer = factory.Core().V1().Namespaces().Informer()
case CONFIGMAP:
informer = factory.Core().V1().ConfigMaps().Informer()
case PERSISTENTVOLUME:
informer = factory.Core().V1().PersistentVolumes().Informer()
case PERSISTENTVOLUMECLAIM:
informer = factory.Core().V1().PersistentVolumeClaims().Informer()
case STORAGECLASS:
informer = factory.Storage().V1().StorageClasses().Informer()
case INGRESS:
informer = factory.Networking().V1().Ingresses().Informer()
default:
logger.Error(context.Background(), "ENTITY_PIPELINE_REGISTER_ERROR", "resourceType not support", m.resourceType)
return factory, nil
}
return factory, informer
}
func getIdxRules(resourceType string) []IdxFunc {
switch resourceType {
case NODE:
return []IdxFunc{generateNodeKey}
case POD:
return []IdxFunc{generateCommonKey, generatePodIPKey, generateContainerIDKey, generateHostIPKey}
case SERVICE:
return []IdxFunc{generateCommonKey, generateServiceIPKey}
default:
return []IdxFunc{generateCommonKey}
}
}
func (m *k8sMetaCache) preProcess(obj interface{}) interface{} {
switch m.resourceType {
case POD:
return m.preProcessPod(obj)
default:
return m.preProcessCommon(obj)
}
}
func (m *k8sMetaCache) preProcessCommon(obj interface{}) interface{} {
runtimeObj, ok := obj.(runtime.Object)
if !ok {
logger.Error(context.Background(), "K8S_META_PRE_PROCESS_ERROR", "object is not runtime object", obj)
return obj
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
logger.Error(context.Background(), "K8S_META_PRE_PROCESS_ERROR", "object is not meta object", err)
return obj
}
// fill empty kind
if runtimeObj.GetObjectKind().GroupVersionKind().Empty() {
gvk, err := apiutil.GVKForObject(runtimeObj, m.schema)
if err != nil {
logger.Error(context.Background(), "K8S_META_PRE_PROCESS_ERROR", "get GVK for object error", err)
return obj
}
runtimeObj.GetObjectKind().SetGroupVersionKind(gvk)
}
// remove unnecessary annotations
if metaObj.GetAnnotations() != nil {
if _, ok := metaObj.GetAnnotations()["kubectl.kubernetes.io/last-applied-configuration"]; ok {
metaObj.GetAnnotations()["kubectl.kubernetes.io/last-applied-configuration"] = ""
}
}
return runtimeObj
}
func (m *k8sMetaCache) preProcessPod(obj interface{}) interface{} {
m.preProcessCommon(obj)
pod, ok := obj.(*v1.Pod)
if !ok {
logger.Error(context.Background(), "K8S_META_PRE_PROCESS_ERROR", "object is not pod", obj)
return obj
}
pod.ManagedFields = nil
pod.Status.Conditions = nil
pod.Spec.Tolerations = nil
return pod
}
func generateCommonKey(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{}, err
}
return []string{generateNameWithNamespaceKey(meta.GetNamespace(), meta.GetName())}, nil
}
func generateNodeKey(obj interface{}) ([]string, error) {
node, err := meta.Accessor(obj)
if err != nil {
return []string{}, err
}
return []string{node.GetName()}, nil
}
func generateNameWithNamespaceKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
func generatePodIPKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, fmt.Errorf("object is not a pod")
}
return []string{pod.Status.PodIP}, nil
}
func generateContainerIDKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, fmt.Errorf("object is not a pod")
}
result := make([]string, len(pod.Status.ContainerStatuses))
for i, containerStatus := range pod.Status.ContainerStatuses {
result[i] = truncateContainerID(containerStatus.ContainerID)
}
return result, nil
}
func generateHostIPKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, fmt.Errorf("object is not a pod")
}
return []string{addHostIPIndexPrefex(pod.Status.HostIP)}, nil
}
func addHostIPIndexPrefex(ip string) string {
return hostIPIndexPrefix + ip
}
func generateServiceIPKey(obj interface{}) ([]string, error) {
svc, ok := obj.(*v1.Service)
if !ok {
return []string{}, fmt.Errorf("object is not a service")
}
results := make([]string, 0)
for _, ip := range svc.Spec.ClusterIPs {
if ip != "" {
results = append(results, ip)
}
}
for _, ip := range svc.Spec.ExternalIPs {
if ip != "" {
results = append(results, ip)
}
}
if svc.Spec.LoadBalancerIP != "" {
results = append(results, svc.Spec.LoadBalancerIP)
}
return results, nil
}