pkg/metrics/collector/pod_ip_metrics.go (387 lines of code) (raw):

/* Copyright 2022 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package collector import ( "bufio" "context" "fmt" "math" "net" "os" "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "github.com/fsnotify/fsnotify" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" ) type ipFamily int const ( ipv4 ipFamily = iota ipv6 dual gkePodNetworkDir = "/host/var/lib/cni/networks/gke-pod-network" dualStack = "IPV4_IPV6" // To configure the prometheus histogram bucket for ip reuse. bucketStart = 5e3 bucketWidth = 5e3 bucketCount = 12 ) var ( usedIPv4AddrCountDesc = prometheus.NewDesc( "used_ipv4_addr_count", "Indicates how many IPv4 addresses are in use.", nil, nil, ) usedIPv6AddrCountDesc = prometheus.NewDesc( "used_ipv6_addr_count", "Indicates how many IPv6 addresses are in use.", nil, nil, ) dualStackCountDesc = prometheus.NewDesc( "dual_stack_count", "Indicates how many pods have dual stack IP addresses.", nil, nil, ) dualStackErrorCountDesc = prometheus.NewDesc( "dual_stack_error_count", "Indicates how many pods did not get dual stack IPs erroneously.", nil, nil, ) duplicateIPCountDesc = prometheus.NewDesc( "duplicate_ip_count", "Indicates how many pods had more than one IP per family.", nil, nil, ) ipReuseMinDesc = prometheus.NewDesc( "ip_reuse_minimum_time_milliseconds", "Indicates the minimum IP reuse time.", nil, nil, ) ipReuseAvgDesc = prometheus.NewDesc( "ip_reuse_average_time_milliseconds", "Indicates the average IP reuse time.", nil, nil, ) // We want 60 seconds to be the threshold for watching quick ip reuse cases. // So the histogram will fill the 12 buckets until 60 seconds with size 5 seconds. // Others that are above 60 seconds will go to bucket {le="+Inf"}, which are out-of-concerned. bucketKeys = prometheus.LinearBuckets(bucketStart, bucketWidth, bucketCount) ipReuseHistogramDesc = prometheus.NewDesc( "ip_reuse_time_duration_milliseconds", "Indicates the IP reuse duration in millisecond for all IPs.", nil, nil, ) assignedIPv4AddrCountDesc = prometheus.NewDesc( "ipv4_assigned_count", "Indicates the total IPv4 IPs assigned to the subnetwork.", nil, nil, ) assignedIPv6AddrCountDesc = prometheus.NewDesc( "ipv6_assigned_count", "Indicates the total IPv6 IPs assigned to the subnetwork.", nil, nil, ) ) type podIPMetricsCollector struct { usedIPv4AddrCount uint64 usedIPv6AddrCount uint64 dualStackCount uint64 dualStackErrorCount uint64 duplicateIPCount uint64 reuseIPs reuseIPs reuseMap map[string]*ipReuse clientset kubernetes.Interface nodeName string clock clock assignedIPv4AddrCount uint64 assignedIPv6AddrCount uint64 podIPMetricsWatcherIsInitialized bool } type reuseIPs struct { count uint64 sum float64 min uint64 buckets map[float64]uint64 } // IpReuse contains values for reuseMap tracking ip reuse status. type ipReuse struct { ipReleasedTimestamp time.Time ipReuseInterval float64 } type clock interface { Now() time.Time } type realClock struct{} func (c *realClock) Now() time.Time { return time.Now() } func (ip ipFamily) String() string { return [...]string{"IPV4", "IPV6", "IPV4_IPV6"}[ip] } func init() { registerCollector("pod_ip_metrics", NewPodIPMetricsCollector) } // NewPodIpMetricsCollector returns a new Collector exposing pod IP allocation // stats. func NewPodIPMetricsCollector() (Collector, error) { config, err := rest.InClusterConfig() if err != nil { return nil, fmt.Errorf("error creating in-cluster config: %v", err) } config.ContentType = runtime.ContentTypeProtobuf clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("error creating clientset: %v", err) } nodeName := os.Getenv("CURRENT_NODE_NAME") if nodeName == "" { nodeName, err = os.Hostname() if err != nil { return nil, fmt.Errorf("error getting hostname: %v", err) } } return &podIPMetricsCollector{ clientset: clientset, nodeName: nodeName, clock: &realClock{}, }, nil } func readLine(path string) (string, error) { buf, err := os.Open(path) if err != nil { return "", err } defer func() { if err = buf.Close(); err != nil { glog.Errorf("Error closing file %s: %s", path, err) } }() s := bufio.NewScanner(buf) s.Scan() return s.Text(), s.Err() } func (c *podIPMetricsCollector) listIPAddresses(dir string) error { f, err := os.Open(dir) if err != nil { glog.Errorf("Error opening directory %s, %v", dir, err) return err } files, err := f.Readdir(0) if err != nil { glog.Errorf("Error while reading files in directory %v", err) return err } podMap := make(map[string]ipFamily) var ipv4Count, ipv6Count, dupIPCount, dualCount, dualErrCount uint64 for _, v := range files { if ip := net.ParseIP(v.Name()); ip == nil { // This isn't an IP address file continue } var family ipFamily if strings.Contains(v.Name(), ":") { ipv6Count++ family = ipv6 } else { ipv4Count++ family = ipv4 } // Update the map and track IP families only for dual stack clusters fileName := fmt.Sprintf("%s/%s", dir, v.Name()) podID, err := readLine(fileName) if err != nil { glog.Errorf("Error reading file %s: %v", fileName, err) continue } f, ok := podMap[podID] // TODO: #351 - fix lint errors //nolint:gocritic if !ok { podMap[podID] = family } else if (f == ipv4 && family == ipv6) || (f == ipv6 && family == ipv4) { podMap[podID] = dual } else { dupIPCount++ } } if stackType == dualStack { for _, family := range podMap { if family == dual { dualCount++ } else { dualErrCount++ } } } c.usedIPv4AddrCount = ipv4Count c.usedIPv6AddrCount = ipv6Count c.dualStackCount = dualCount c.dualStackErrorCount = dualErrCount c.duplicateIPCount = dupIPCount return nil } func getIPv4(s string) (bool, string) { names := strings.Split(s, "/") f := names[len(names)-1] ip := net.ParseIP(f) if ip == nil || ip.To4() == nil { return false, "" } return true, f } // After the ip is removed, it will be put into the reuseMap. // After the ip is reused, the reuseMap will update the ip reuse interval. // The reuseMap will not record any ip that is being used unless it is reused. func (c *podIPMetricsCollector) updateReuseIPStats(e fsnotify.Event, f string) { reuseIP, ok := c.reuseMap[f] switch { case e.Op&fsnotify.Remove == fsnotify.Remove: if !ok { c.reuseMap[f] = &ipReuse{c.clock.Now(), 0} } else { reuseIP.ipReleasedTimestamp = c.clock.Now() } case e.Op&fsnotify.Create == fsnotify.Create: if ok { oldT := reuseIP.ipReleasedTimestamp diff := uint64(c.clock.Now().Sub(oldT).Milliseconds()) if diff > 0 { reuseIP.ipReuseInterval = float64(diff) c.reuseIPs.count++ c.reuseIPs.sum += float64(diff) if diff < c.reuseIPs.min { c.reuseIPs.min = diff } c.fillBuckets(diff) } } } } func (c *podIPMetricsCollector) fillBuckets(diff uint64) { for _, bound := range bucketKeys { if float64(diff) < bound { c.reuseIPs.buckets[bound]++ } } } // countIPsFronRange returns the number of available hosts in a subnet. // The max number is limited by the size of an uint64. // Number of hosts is calculated with the formula: // IPv4: 2^x – 2, not consider network and broadcast address // IPv6: 2^x - 1, not consider network address // where x is the number of host bits in the subnet. func (c *podIPMetricsCollector) countIPsFromRange(subnet *net.IPNet) (uint64, error) { ones, bits := subnet.Mask.Size() if bits <= ones { return 0, fmt.Errorf("invalid subnet mask: %v", subnet.Mask) } // this checks that we are not overflowing an int64 if bits-ones >= 64 { return math.MaxUint64, nil } count := uint64(1) << uint(bits-ones) count-- if subnet.IP.To4() != nil { // Don't use the IPv4 network's broadcast address if count == 0 { return 0, fmt.Errorf("subnet includes only the network and broadcast addresses") } count-- } else if count == 0 { return 0, fmt.Errorf("subnet includes only the network address") } return count, nil } func (c *podIPMetricsCollector) updateAssignedIPs(subnet *net.IPNet, totalIP uint64) { if subnet.IP.To16() != nil && subnet.IP.To4() == nil { c.assignedIPv6AddrCount += totalIP } else if subnet.IP.To4() != nil { c.assignedIPv4AddrCount += totalIP } } func (c *podIPMetricsCollector) calculateAssignedIP() error { node, err := c.clientset.CoreV1().Nodes().Get(context.Background(), c.nodeName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("error getting node %s: %v", c.nodeName, err) } podCIDRs := node.Spec.PodCIDRs if len(podCIDRs) == 0 { if node.Spec.PodCIDR == "" { return fmt.Errorf("both podCIDR and podCIDRs are empty") } podCIDRs = []string{node.Spec.PodCIDR} } var firstErr error for _, podCIDR := range podCIDRs { _, subnet, err := net.ParseCIDR(podCIDR) if err != nil { if firstErr == nil { firstErr = fmt.Errorf("error parsing podCIDR %s: %v", podCIDR, err) } continue } totalIP, err := c.countIPsFromRange(subnet) if err != nil { if firstErr == nil { firstErr = fmt.Errorf("error calculating total IPs for subnet %s: %v", subnet.IP, err) } continue } c.updateAssignedIPs(subnet, totalIP) } return firstErr } func (c *podIPMetricsCollector) setupDirectoryWatcher(dir string) error { if err := c.listIPAddresses(dir); err != nil { return err } watcher, err := fsnotify.NewWatcher() if err != nil { glog.Errorf("NewWatcher failed: %v", err) return err } c.reuseIPs.min = uint64(math.Inf(+1)) c.reuseIPs.buckets = make(map[float64]uint64) for _, bound := range bucketKeys { c.reuseIPs.buckets[bound] = 0 } c.reuseMap = make(map[string]*ipReuse) go func() { defer func() { watcher.Close() c.podIPMetricsWatcherIsInitialized = false }() for { select { case e, ok := <-watcher.Events: if !ok { glog.Error("watcher is not ok") return } if err := c.listIPAddresses(dir); err != nil { //nolint:govet return } // Only update the ip reuse mininum, average and histogram for IPv4. ok, f := getIPv4(e.Name) if ok { c.updateReuseIPStats(e, f) } case err, ok := <-watcher.Errors: //nolint:govet glog.Errorf("Received error from watcher %v, ok: %t", err, ok) if !ok { return } } } }() err = watcher.Add(dir) if err != nil { glog.Errorf("Failed to add watcher for directory %s: %v", dir, err) } c.podIPMetricsWatcherIsInitialized = true return nil } func (c *podIPMetricsCollector) Update(ch chan<- prometheus.Metric) error { if !c.podIPMetricsWatcherIsInitialized { if err := c.setupDirectoryWatcher(gkePodNetworkDir); err != nil { glog.Errorf("setupDirectoryWatcher returned error: %v", err) return nil } if err := c.calculateAssignedIP(); err != nil { glog.Errorf("calculateAssignedIP returned error: %v", err) } } c.populateMetrics(ch) return nil } func (c *podIPMetricsCollector) populateMetrics(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric(usedIPv4AddrCountDesc, prometheus.GaugeValue, float64(c.usedIPv4AddrCount)) ch <- prometheus.MustNewConstMetric(usedIPv6AddrCountDesc, prometheus.GaugeValue, float64(c.usedIPv6AddrCount)) ch <- prometheus.MustNewConstMetric(dualStackCountDesc, prometheus.GaugeValue, float64(c.dualStackCount)) ch <- prometheus.MustNewConstMetric(dualStackErrorCountDesc, prometheus.GaugeValue, float64(c.dualStackErrorCount)) ch <- prometheus.MustNewConstMetric(duplicateIPCountDesc, prometheus.GaugeValue, float64(c.duplicateIPCount)) ch <- prometheus.MustNewConstMetric(ipReuseMinDesc, prometheus.GaugeValue, float64(c.reuseIPs.min)) ch <- prometheus.MustNewConstMetric(ipReuseAvgDesc, prometheus.GaugeValue, c.reuseIPs.sum/float64(c.reuseIPs.count)) ch <- prometheus.MustNewConstHistogram(ipReuseHistogramDesc, c.reuseIPs.count, c.reuseIPs.sum, c.reuseIPs.buckets) ch <- prometheus.MustNewConstMetric(assignedIPv4AddrCountDesc, prometheus.GaugeValue, float64(c.assignedIPv4AddrCount)) ch <- prometheus.MustNewConstMetric(assignedIPv6AddrCountDesc, prometheus.GaugeValue, float64(c.assignedIPv6AddrCount)) }