pkg/process/finders/kubernetes/registry.go (114 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package kubernetes import ( "reflect" "time" "k8s.io/apimachinery/pkg/labels" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" ) const rsyncPeriod = 5 * time.Minute type Registry interface { Start(stopChan chan struct{}) BuildPodContainers() map[string]*PodContainer FindServiceName(namespace, podName string) string } type StaticNamespaceRegistry struct { podInformers []cache.SharedInformer serviceInformers []cache.SharedInformer podServiceNameCache map[string]string } func NewStaticNamespaceRegistry(cli *kubernetes.Clientset, namespaces []string, nodeName string) Registry { r := &StaticNamespaceRegistry{ podInformers: make([]cache.SharedInformer, 0), serviceInformers: make([]cache.SharedInformer, 0), podServiceNameCache: make(map[string]string), } for _, ns := range namespaces { podListWatch := cache.NewListWatchFromClient(cli.CoreV1().RESTClient(), "pods", ns, fields.OneTermEqualSelector("spec.nodeName", nodeName)) podInformer := cache.NewSharedInformer(podListWatch, &v1.Pod{}, rsyncPeriod) podInformer.AddEventHandler(r) r.podInformers = append(r.podInformers, podInformer) serviceListWatch := cache.NewListWatchFromClient(cli.CoreV1().RESTClient(), "services", ns, fields.Everything()) serviceInformer := cache.NewSharedInformer(serviceListWatch, &v1.Service{}, rsyncPeriod) serviceInformer.AddEventHandler(r) r.serviceInformers = append(r.serviceInformers, serviceInformer) } return r } func (r *StaticNamespaceRegistry) Start(stopChan chan struct{}) { for i := range r.podInformers { go r.podInformers[i].Run(stopChan) go r.serviceInformers[i].Run(stopChan) } } func (r *StaticNamespaceRegistry) BuildPodContainers() map[string]*PodContainer { // cgroupid -> container containers := make(map[string]*PodContainer) for _, in := range r.podInformers { list := in.GetStore().List() for _, p := range list { analyzeContainers := AnalyzeContainers(p.(*v1.Pod), r) for _, c := range analyzeContainers { id := c.CGroupID() if id != "" { containers[id] = c } } } } return containers } func (r *StaticNamespaceRegistry) FindServiceName(namespace, podName string) string { return r.podServiceNameCache[namespace+"_"+podName] } func (r *StaticNamespaceRegistry) recomposePodServiceName() { result := make(map[string]string) for i := range r.podInformers { for _, podT := range r.podInformers[i].GetStore().List() { for _, serviceT := range r.serviceInformers[i].GetStore().List() { pod := podT.(*v1.Pod) service := serviceT.(*v1.Service) if pod.Namespace != service.Namespace { continue } if len(service.Spec.Selector) == 0 { continue } if labels.Set(service.Spec.Selector).AsSelector().Matches(labels.Set(pod.ObjectMeta.Labels)) { // if multiple service selector matches the same pod // then must choose one by same logical existing := result[pod.Namespace+"_"+pod.Name] if existing != "" { existing = chooseServiceName(existing, service.Name) } else { existing = service.Name } result[pod.Namespace+"_"+pod.Name] = existing } } } } r.podServiceNameCache = result } func chooseServiceName(a, b string) string { // short name if len(a) < len(b) { return a } else if len(a) > len(b) { return b } // ascii compare if a < b { return a } return b } func (r *StaticNamespaceRegistry) OnAdd(d interface{}) { r.recomposePodServiceName() } func (r *StaticNamespaceRegistry) OnUpdate(d, u interface{}) { same := reflect.DeepEqual(d, u) if !same { r.recomposePodServiceName() } } func (r *StaticNamespaceRegistry) OnDelete(d interface{}) { r.recomposePodServiceName() }