pkg/controller/ipcache/ipcache.go (465 lines of code) (raw):

package ipcache import ( "container/list" "context" "fmt" "reflect" "sync" "sync/atomic" "time" "github.com/alibaba/kubeskoop/pkg/controller/rpc" "github.com/google/uuid" log "github.com/sirupsen/logrus" "golang.org/x/exp/slices" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" ) const ( MaxSyncBatchSize = 20 CompactThreshold = 10240 ) type changeLog struct { revision uint64 opcode rpc.OpCode entry *rpc.CacheEntry } type snapshot struct { lock sync.RWMutex revision uint64 entries map[string]*rpc.CacheEntry } type storage struct { revision atomic.Uint64 snapshot snapshot //full map[string]*rpc.CacheEntry log []*changeLog } type clientstate int const ( stale clientstate = 0 uptodate clientstate = 1 blocking clientstate = 2 ) type client struct { revision uint64 closed bool state clientstate ch chan *changeLog } type Service struct { rpc.UnimplementedIPCacheServiceServer storage storage clients *list.List clientsLock sync.RWMutex period string logChan chan *changeLog logLock sync.Mutex } func (s *Service) ListCache(_ context.Context, _ *rpc.ListCacheRequest) (*rpc.ListCacheResponse, error) { s.storage.snapshot.lock.RLock() defer s.storage.snapshot.lock.RUnlock() var entries []*rpc.CacheEntry for _, v := range s.storage.snapshot.entries { entries = append(entries, v) } return &rpc.ListCacheResponse{ Period: s.period, Revision: s.storage.snapshot.revision, Entries: entries, }, nil } func (s *Service) WatchCache(request *rpc.WatchCacheRequest, server rpc.IPCacheService_WatchCacheServer) error { if request.Period != s.period { return status.Error(codes.InvalidArgument, "invalid period") } s.clientsLock.Lock() s.storage.snapshot.lock.RLock() if request.Revision < s.storage.snapshot.revision { s.storage.snapshot.lock.RUnlock() s.clientsLock.Unlock() return status.Error(codes.DataLoss, "data loss") } c := &client{ revision: request.Revision, closed: false, state: stale, ch: make(chan *changeLog, 4*1024), } s.clients.PushBack(c) s.storage.snapshot.lock.RUnlock() s.clientsLock.Unlock() loop: for { select { case <-server.Context().Done(): c.closed = true break loop case cl := <-c.ch: err := server.Send(&rpc.WatchCacheResponse{ Revision: cl.revision, Opcode: cl.opcode, Entry: cl.entry, }) if err != nil { log.Warningf("failed send watch response to client: %v", err) c.closed = true break loop } } } return nil } func findNext(slice []*changeLog, target uint64) int { start := 0 end := len(slice) - 1 ret := -1 for start <= end { mid := (start + end) / 2 if slice[mid].revision <= target { start = mid + 1 } else { ret = mid end = mid - 1 } } return ret } func NewService(podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer) *Service { s := &Service{ storage: storage{ snapshot: snapshot{ entries: make(map[string]*rpc.CacheEntry), }, }, logChan: make(chan *changeLog, 10*1024), clients: list.New(), period: uuid.NewString(), } _, err := podInformer.Informer().AddEventHandler(&cache.ResourceEventHandlerFuncs{ AddFunc: s.onAddPod, DeleteFunc: s.onDeletePod, UpdateFunc: s.onUpdatePod, }) if err != nil { log.Fatalf("failed to add pod resource handler: %v", err) } _, err = nodeInformer.Informer().AddEventHandler(&cache.ResourceEventHandlerFuncs{ AddFunc: s.onAddNode, DeleteFunc: s.onDeleteNode, UpdateFunc: s.onUpdateNode, }) if err != nil { log.Fatalf("failed to add node resource handler: %v", err) } go s.syncControl() return s } func allPodIPs(pod *v1.Pod) []string { var ipList []string if pod.Status.PodIPs != nil { for _, ip := range pod.Status.PodIPs { ipList = append(ipList, ip.IP) } } else if pod.Status.PodIP != "" { ipList = append(ipList, pod.Status.PodIP) } else { log.Infof("pod %s/%s has no ip, skip", pod.Namespace, pod.Name) return nil } return ipList } func createCacheEntries4Pod(pod *v1.Pod) []*rpc.CacheEntry { if pod.Spec.HostNetwork { return nil } var entries []*rpc.CacheEntry for _, ip := range allPodIPs(pod) { entries = append(entries, &rpc.CacheEntry{ IP: ip, Meta: &rpc.CacheEntry_Pod{ Pod: &rpc.PodMeta{ Namespace: pod.Namespace, Name: pod.Name, }, }, }) } return entries } func createCacheEntries4Node(node *v1.Node) []*rpc.CacheEntry { var entries []*rpc.CacheEntry for _, ip := range allNodeIPs(node) { entries = append(entries, &rpc.CacheEntry{ IP: ip, Meta: &rpc.CacheEntry_Node{ Node: &rpc.NodeMeta{ Name: node.Name, }, }, }) } return entries } func allNodeIPs(node *v1.Node) []string { var ret []string for _, addr := range node.Status.Addresses { if addr.Type == v1.NodeInternalIP || addr.Type == v1.NodeExternalIP { ret = append(ret, addr.Address) } } return ret } func (s *Service) onAddPod(obj interface{}) { pod := obj.(*v1.Pod) entries := createCacheEntries4Pod(pod) if len(entries) > 0 { s.logChange(rpc.OpCode_Set, entries) } } func (s *Service) onDeletePod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) return } } entries := createCacheEntries4Pod(pod) if len(entries) == 0 { return } s.logChange(rpc.OpCode_Del, entries) } func (s *Service) onUpdatePod(old interface{}, cur interface{}) { newPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) if newPod.ResourceVersion == oldPod.ResourceVersion { return } oldIPs := allPodIPs(oldPod) newIPs := allPodIPs(newPod) if reflect.DeepEqual(oldIPs, newIPs) { return } toRemove := createCacheEntries4Pod(oldPod) if len(toRemove) > 0 { s.logChange(rpc.OpCode_Del, toRemove) } toAdd := createCacheEntries4Pod(newPod) if len(toAdd) > 0 { s.logChange(rpc.OpCode_Set, toAdd) } } func (s *Service) onAddNode(obj interface{}) { node := obj.(*v1.Node) entries := createCacheEntries4Node(node) if len(entries) > 0 { s.logChange(rpc.OpCode_Set, entries) } } func (s *Service) onDeleteNode(obj interface{}) { node, ok := obj.(*v1.Node) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return } node, ok = tombstone.Obj.(*v1.Node) if !ok { utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) return } } entries := createCacheEntries4Node(node) if len(entries) > 0 { s.logChange(rpc.OpCode_Del, entries) } } func (s *Service) onUpdateNode(old interface{}, cur interface{}) { newNode := cur.(*v1.Node) oldNode := old.(*v1.Node) if newNode.ResourceVersion == oldNode.ResourceVersion { return } oldIPs := allNodeIPs(oldNode) newIPs := allNodeIPs(newNode) if reflect.DeepEqual(oldIPs, newIPs) { return } toRemove := createCacheEntries4Node(oldNode) if len(toRemove) > 0 { s.logChange(rpc.OpCode_Del, toRemove) } toAdd := createCacheEntries4Node(newNode) if len(toAdd) > 0 { s.logChange(rpc.OpCode_Set, toAdd) } } func (s *Service) logChange(op rpc.OpCode, entries []*rpc.CacheEntry) { if len(entries) == 0 { return } s.logLock.Lock() defer s.logLock.Unlock() for _, e := range entries { revision := s.storage.revision.Add(1) s.logChan <- &changeLog{ revision: revision, opcode: op, entry: e, } } } func apply(m map[string]*rpc.CacheEntry, cl *changeLog) { entry := cl.entry switch cl.opcode { case rpc.OpCode_Set: m[entry.IP] = entry case rpc.OpCode_Del: delete(m, entry.IP) default: log.Errorf("invalid opcode %d", cl.opcode) } } func (s *Service) compactLog() { s.clientsLock.RLock() defer s.clientsLock.RUnlock() s.storage.snapshot.lock.Lock() defer s.storage.snapshot.lock.Unlock() var minRevision uint64 for e := s.clients.Front(); e != nil; e = e.Next() { c := e.Value.(*client) if c.closed { continue } if c.revision < minRevision { minRevision = c.revision } } if minRevision == 0 { return } var first, last uint64 for i := 0; i < len(s.storage.log); { cl := s.storage.log[i] if cl.revision > minRevision { break } if first == 0 { first = cl.revision } last = cl.revision apply(s.storage.snapshot.entries, cl) s.storage.snapshot.revision = cl.revision } if last > 0 { log.Infof("compact log from %d(include) to %d(include)", first, last) } } func (s *Service) copyClients(filter ...clientstate) []*client { s.clientsLock.RLock() defer s.clientsLock.RUnlock() var ret []*client for e := s.clients.Front(); e != nil; e = e.Next() { c := e.Value.(*client) if c.closed { close(c.ch) s.clients.Remove(e) continue } if len(filter) == 0 || slices.Contains(filter, c.state) { ret = append(ret, c) } } return ret } func (s *Service) syncControl() { ticker := time.NewTicker(500 * time.Millisecond) var pendingLogs []*changeLog var maxSyncedRevision uint64 for { select { case cl := <-s.logChan: //TODO check change log effectiveness, if change log has no effect on current data, ignore it to reduce sync s.storage.log = append(s.storage.log, cl) if len(s.storage.log) > CompactThreshold { s.compactLog() } pendingLogs = append(pendingLogs, cl) if len(pendingLogs) > MaxSyncBatchSize { s.syncUpToDateClients(pendingLogs, maxSyncedRevision) maxSyncedRevision = pendingLogs[len(pendingLogs)-1].revision pendingLogs = nil } case <-ticker.C: if len(pendingLogs) > 0 { s.syncUpToDateClients(pendingLogs, maxSyncedRevision) maxSyncedRevision = pendingLogs[len(pendingLogs)-1].revision pendingLogs = nil } s.syncClients(maxSyncedRevision) } } } func (s *Service) syncUpToDateClients(cls []*changeLog, upToDateRevision uint64) { clients := s.copyClients(uptodate) for _, c := range clients { if c.revision != upToDateRevision { panic(fmt.Sprintf("expect revision %d for up to date client, but got %d", upToDateRevision, c.revision)) } s.syncOneUpdateToDateClient(c, cls) } } func (s *Service) syncOneUpdateToDateClient(c *client, cls []*changeLog) { for _, cl := range cls { select { case c.ch <- cl: c.revision = cl.revision default: c.state = blocking return } } c.state = uptodate } func (s *Service) syncClients(upToDateRevision uint64) { clients := s.copyClients(stale, blocking) for _, c := range clients { s.syncOne(c, upToDateRevision) } } func (s *Service) syncOne(c *client, upToDateRevision uint64) { index := findNext(s.storage.log, c.revision) if index == -1 { return } for index < len(s.storage.log) { cl := s.storage.log[index] select { case c.ch <- cl: index++ c.revision = cl.revision if cl.revision == upToDateRevision { c.state = uptodate return } if cl.revision > upToDateRevision { //impossible c.state = stale return } default: c.state = blocking return } } } var _ rpc.IPCacheServiceServer = &Service{}