pkg/metrics/collector/netlink_metrics.go (316 lines of code) (raw):

/* Copyright 2021 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package collector import ( "flag" "fmt" "os" "strconv" "sync" "syscall" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/vishvananda/netlink/nl" "github.com/vishvananda/netns" "golang.org/x/sys/unix" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "github.com/GoogleCloudPlatform/netd/pkg/tcp_metrics/inetdiag" "github.com/GoogleCloudPlatform/netd/pkg/tcp_metrics/parser" "github.com/GoogleCloudPlatform/netd/pkg/tcp_metrics/tcp" ) // Note, a sum over the pod tcp connections/ retransmits != the same value // as the node aggregates. This is because the node aggregates don't just // sum over pod namespaces but all namespaces including the default one. var ( NodeTCPConnectionsDesc = prometheus.NewDesc( "node_tcp_connections", "Current amount of tcp connections on a node", nil, nil, ) NodeActiveTCPRetransmits = prometheus.NewDesc( "node_tcp_retransmits", "Current amount of tcp retransmits on a node", nil, nil, ) TCPConnectionsDesc = prometheus.NewDesc( "active_tcp_connections", "Number of active TCP connections by hosted pod name/namespace", []string{"pod_name", "namespace_name"}, nil, ) ActiveTCPRetransmits = prometheus.NewDesc( "active_tcp_retransmits", "Number of retransmits on active TCP connections by hosted pod name/namespace", []string{"pod_name", "namespace_name"}, nil, ) ) type netlinkCollector struct { } type netlinkStats struct { retransmits uint64 tcpConnections uint64 } var ( ipMap = initIPMap() nodeName = os.Getenv("CURRENT_NODE_NAME") enablePodWatch bool firstRun = true ) func init() { flag.BoolVar(&enablePodWatch, "enable-pod-watch", false, "Enable pod watch on netlink_metrics") registerCollector("netlink_metrics", NewNetlinkCollector) } // Functionality for multithreaded map usage. type safeIPMap struct { // Protects the map from multiple writes mapMux sync.RWMutex ipMap map[string]*v1.Pod } func initIPMap() *safeIPMap { var newIPMap safeIPMap newIPMap.ipMap = make(map[string]*v1.Pod) return &newIPMap } func (curMap *safeIPMap) safeIPWrite(key string, value *v1.Pod) { curMap.mapMux.Lock() defer curMap.mapMux.Unlock() curMap.ipMap[key] = value } func (curMap *safeIPMap) safeIPDelete(key string) { curMap.mapMux.Lock() defer curMap.mapMux.Unlock() delete(curMap.ipMap, key) } func (curMap *safeIPMap) safeIPRead(key string) (*v1.Pod, bool) { curMap.mapMux.RLock() defer curMap.mapMux.RUnlock() podDef, ok := curMap.ipMap[key] return podDef, ok } // Functionality for pod watching. func createPodWatch() error { // Creates the in-cluster config. config, err := rest.InClusterConfig() if err != nil { return err } config.ContentType = runtime.ContentTypeProtobuf // creates the clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { return err } watchlist := cache.NewListWatchFromClient( clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", nodeName), ) _, controller := cache.NewInformerWithOptions(cache.InformerOptions{ ListerWatcher: watchlist, ObjectType: &v1.Pod{}, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: onPodAdd, UpdateFunc: onPodUpdate, DeleteFunc: onPodDelete, }, }) stopper := make(chan struct{}) go controller.Run(stopper) if !cache.WaitForCacheSync(stopper, controller.HasSynced) { glog.Infof("Timed out waiting for caches to sync.") } return nil } func hasPodIP(pod *v1.Pod) bool { // Pod not assigned. if pod.Status.PodIP == "" { return false } // Pod running on host network. if pod.Status.PodIP == pod.Status.HostIP { return false } return true } func onPodAdd(podObj interface{}) { pod := podObj.(*v1.Pod) if hasPodIP(pod) { ipMap.safeIPWrite(pod.Status.PodIP, pod) } } func onPodUpdate(oldPod, newPod interface{}) { prevPod := oldPod.(*v1.Pod) pod := newPod.(*v1.Pod) if prevPod.Status.PodIP == pod.Status.PodIP { return } if hasPodIP(prevPod) { ipMap.safeIPDelete(prevPod.Status.PodIP) } if hasPodIP(pod) { ipMap.safeIPWrite(pod.Status.PodIP, pod) } glog.Infof("pods are: %+q", ipMap.ipMap) } func onPodDelete(podObj interface{}) { pod := podObj.(*v1.Pod) if hasPodIP(pod) { ipMap.safeIPDelete(pod.Status.PodIP) } } func updatePodStats(stats netlinkStats, tcpInfo *tcp.LinuxTCPInfo) netlinkStats { stats.tcpConnections++ if tcpInfo == nil { return stats } stats.retransmits += uint64(tcpInfo.Retrans) return stats } func createStatMap(snapshots []*parser.Snapshot) map[*v1.Pod]netlinkStats { statMap := make(map[*v1.Pod]netlinkStats) for _, snapshot := range snapshots { if snapshot.SockInfo == nil { continue } if pod, ok := ipMap.safeIPRead(snapshot.SockInfo.SrcIP); ok { statMap[pod] = updatePodStats(statMap[pod], snapshot.TCPInfo) } if pod, ok := ipMap.safeIPRead(snapshot.SockInfo.DstIP); ok { statMap[pod] = updatePodStats(statMap[pod], snapshot.TCPInfo) } } return statMap } func NewNetlinkCollector() (Collector, error) { return &netlinkCollector{}, nil } func getNamespaces() ([]netns.NsHandle, error) { files, err := os.ReadDir("/proc") if err != nil { return nil, err } pids := make([]int, 0) for _, f := range files { pid, err := strconv.Atoi(f.Name()) if err != nil { continue } pids = append(pids, pid) } nsSet := make(map[uint64]netns.NsHandle) var s unix.Stat_t for _, pid := range pids { ns, err := netns.GetFromPid(pid) if err != nil { continue } // Check for a new inode. if err := unix.Fstat(int(ns), &s); err != nil { continue } if _, ok := nsSet[s.Ino]; ok { ns.Close() continue } nsSet[s.Ino] = ns } namespaces := make([]netns.NsHandle, 0) for _, ns := range nsSet { namespaces = append(namespaces, ns) } return namespaces, nil } func getSnapshots(req *nl.NetlinkRequest) ([]*parser.Snapshot, error) { var snps []*parser.Snapshot sockType := syscall.NETLINK_INET_DIAG namespaces, err := getNamespaces() if err != nil { return nil, err } basens, err := netns.Get() if err != nil { return nil, err } for _, curNs := range namespaces { defer curNs.Close() s, err := nl.SubscribeAt(curNs, basens, sockType) if err != nil { glog.Infof("Could not subscribe to netlink namespaces %q", curNs) continue } defer s.Close() if err := s.Send(req); err != nil { //nolint:govet return nil, err } pid, err := s.GetPid() if err != nil { return nil, err } // Adapted this from req.Execute in nl_linux.go snapshotLoop: for { msgs, _, err := s.Receive() if err != nil { return nil, err } // TODO avoid the copy. for i := range msgs { m, shouldContinue, err := inetdiag.ProcessMessage(&msgs[i], req.Seq, pid) if err != nil { return nil, err } if m != nil { cur, err := parser.MakeSnapShot(m, true) if cur == nil || err != nil { continue } snps = append(snps, cur) } if !shouldContinue { break snapshotLoop } } } } return snps, nil } func getRequests() ([]*parser.Snapshot, error) { req6 := inetdiag.MakeReq(syscall.AF_INET6) req := inetdiag.MakeReq(syscall.AF_INET) res6, err := getSnapshots(req6) if err != nil { return nil, fmt.Errorf("error getting req6: %q", err) } res, err := getSnapshots(req) if err != nil { return nil, fmt.Errorf("error getting req: %q", err) } return append(res, res6...), nil } func getRetransmits(snapshots []*parser.Snapshot) uint64 { var retransmits uint64 retransmits = 0 for _, snap := range snapshots { if snap == nil || snap.TCPInfo == nil { continue } retransmits += uint64(snap.TCPInfo.Retrans) } return retransmits } func (c *netlinkCollector) Update(ch chan<- prometheus.Metric) error { if enablePodWatch && firstRun { if err := createPodWatch(); err != nil { glog.Infof("Error, pod watch not running with err, %q", err) } firstRun = false } snapshots, err := getRequests() if err != nil { return fmt.Errorf("could not get tcp requests: %q", err) } retransmits := getRetransmits(snapshots) ch <- prometheus.MustNewConstMetric(NodeTCPConnectionsDesc, prometheus.GaugeValue, float64(len(snapshots))) ch <- prometheus.MustNewConstMetric(NodeActiveTCPRetransmits, prometheus.GaugeValue, float64(retransmits)) if !enablePodWatch { return nil } statMap := createStatMap(snapshots) for pod, stats := range statMap { ch <- prometheus.MustNewConstMetric(TCPConnectionsDesc, prometheus.GaugeValue, float64(stats.tcpConnections), pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) ch <- prometheus.MustNewConstMetric(ActiveTCPRetransmits, prometheus.GaugeValue, float64(stats.retransmits), pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) } return nil }