pkg/exporter/probe/proctcpsummary/proctcp.go (257 lines of code) (raw):

package proctcpsummary import ( "bufio" "context" "encoding/hex" "fmt" "io" "net" "os" "strconv" "strings" "github.com/alibaba/kubeskoop/pkg/exporter/probe" log "github.com/sirupsen/logrus" "github.com/alibaba/kubeskoop/pkg/exporter/nettop" ) const ( ModuleName = "proctcpsummary" TCPEstablishedConn = "tcpestablishedconn" TCPTimeWaitConn = "tcptimewaitconn" TCPCloseWaitConn = "tcpclosewaitconn" TCPSynSentConn = "tcpsynsentconn" TCPSynRecvConn = "tcpsynrecvconn" TCPTXQueue = "tcptxqueue" TCPRXQueue = "tcprxqueue" TCPListenBacklog = "tcplistenbacklog" // st mapping of tcp state /*TCPEstablished:1 TCP_SYN_SENT:2 TCP_SYN_RECV:3 TCP_FIN_WAIT1:4 TCP_FIN_WAIT2:5 TCPTimewait:6 TCP_CLOSE:7 TCP_CLOSE_WAIT:8 TCP_LAST_ACL:9 TCPListen:10 TCP_CLOSING:11*/ TCPEstablished = 1 TCPSynSent = 2 TCPSynRecv = 3 TCPTimewait = 6 TCPCloseWait = 8 TCPListen = 10 readLimit = 4294967296 // Byte -> 4 GiB ) type ( NetIPSocket []*netIPSocketLine NetIPSocketSummary struct { TxQueueLength uint64 RxQueueLength uint64 UsedSockets uint64 } netIPSocketLine struct { Sl uint64 LocalAddr net.IP LocalPort uint64 RemAddr net.IP RemPort uint64 St uint64 TxQueue uint64 RxQueue uint64 UID uint64 Inode uint64 } NetTCP []*netIPSocketLine NetTCPSummary NetIPSocketSummary ) var ( TCPSummaryMetrics = []probe.LegacyMetric{ {Name: TCPEstablishedConn, Help: "The total number of established TCP connections."}, {Name: TCPTimeWaitConn, Help: "The total number of TCP connections in the TIME_WAIT state."}, {Name: TCPCloseWaitConn, Help: "The total number of TCP connections in the CLOSE_WAIT state."}, {Name: TCPSynSentConn, Help: "The total number of TCP connections in the SYN_SENT state."}, {Name: TCPSynRecvConn, Help: "The total number of TCP connections in the SYN_RECV state."}, {Name: TCPTXQueue, Help: "The total size of the TCP transmit queue."}, {Name: TCPRXQueue, Help: "The total size of the TCP receive queue."}, } probeName = "tcpsummary" ) func init() { probe.MustRegisterMetricsProbe(probeName, softNetProbeCreator) } func softNetProbeCreator() (probe.MetricsProbe, error) { p := &ProcTCP{} batchMetrics := probe.NewLegacyBatchMetrics(probeName, TCPSummaryMetrics, p.CollectOnce) return probe.NewMetricsProbe(probeName, p, batchMetrics), nil } type ProcTCP struct { } func (s *ProcTCP) Start(_ context.Context) error { return nil } func (s *ProcTCP) Stop(_ context.Context) error { return nil } func (s *ProcTCP) CollectOnce() (map[string]map[uint32]uint64, error) { ets := nettop.GetAllUniqueNetnsEntity() if len(ets) == 0 { log.Infof("failed collect tcp summary, no entity found") } return collect(ets), nil } func collect(pidlist []*nettop.Entity) map[string]map[uint32]uint64 { resMap := make(map[string]map[uint32]uint64) for idx := range TCPSummaryMetrics { resMap[TCPSummaryMetrics[idx].Name] = map[uint32]uint64{} } for idx := range pidlist { path := fmt.Sprintf("/proc/%d/net/tcp", pidlist[idx].GetPid()) summary, err := newNetTCP(path) if err != nil { log.Warnf("failed collect tcp, path %s, err: %v", path, err) continue } summary6, err := newNetTCP(fmt.Sprintf("/proc/%d/net/tcp6", pidlist[idx].GetPid())) if err != nil { log.Warnf("failed collect tcp6, path %s, err: %v", path, err) continue } est, tw, cw, ss, sr := summary.getEstTwCount() est6, tw6, cw6, ss6, sr6 := summary6.getEstTwCount() tx, rx := summary.getTxRxQueueLength() tx6, rx6 := summary6.getTxRxQueueLength() nsinum := uint32(pidlist[idx].GetNetns()) resMap[TCPEstablishedConn][nsinum] = est + est6 resMap[TCPTimeWaitConn][nsinum] = tw + tw6 resMap[TCPCloseWaitConn][nsinum] = cw + cw6 resMap[TCPSynSentConn][nsinum] = ss + ss6 resMap[TCPSynRecvConn][nsinum] = sr + sr6 resMap[TCPTXQueue][nsinum] = tx + tx6 resMap[TCPRXQueue][nsinum] = rx + rx6 } return resMap } func (n NetTCP) getEstTwCount() (est, tw, cw, ss, sr uint64) { for idx := range n { switch n[idx].St { case TCPEstablished: est++ case TCPTimewait: tw++ case TCPCloseWait: cw++ case TCPSynSent: ss++ case TCPSynRecv: sr++ } } return est, tw, cw, ss, sr } func (n NetTCP) getTxRxQueueLength() (tx uint64, rx uint64) { for idx := range n { if n[idx].St != TCPListen { tx += n[idx].TxQueue rx += n[idx].RxQueue } } return tx, rx } // newNetTCP creates a new NetTCP{,6} from the contents of the given file. func newNetTCP(file string) (NetTCP, error) { n, err := newNetIPSocket(file) n1 := NetTCP(n) return n1, err } func newNetIPSocket(file string) (NetIPSocket, error) { f, err := os.Open(file) if err != nil { return nil, err } defer f.Close() var netIPSocket NetIPSocket lr := io.LimitReader(f, readLimit) s := bufio.NewScanner(lr) s.Scan() // skip first line with headers for s.Scan() { fields := strings.Fields(s.Text()) line, err := parseNetIPSocketLine(fields) if err != nil { return nil, err } netIPSocket = append(netIPSocket, line) } if err := s.Err(); err != nil { return nil, err } return netIPSocket, nil } // parseNetIPSocketLine parses a single line, represented by a list of fields. func parseNetIPSocketLine(fields []string) (*netIPSocketLine, error) { line := &netIPSocketLine{} if len(fields) < 10 { return nil, fmt.Errorf( "cannot parse net socket line as it has less then 10 columns %q", strings.Join(fields, " "), ) } var err error // parse error // sl s := strings.Split(fields[0], ":") if len(s) != 2 { return nil, fmt.Errorf("cannot parse sl field in socket line %q", fields[0]) } if line.Sl, err = strconv.ParseUint(s[0], 0, 64); err != nil { return nil, fmt.Errorf("cannot parse sl value in socket line: %w", err) } // local_address l := strings.Split(fields[1], ":") if len(l) != 2 { return nil, fmt.Errorf("cannot parse local_address field in socket line %q", fields[1]) } if line.LocalAddr, err = parseIP(l[0]); err != nil { return nil, err } if line.LocalPort, err = strconv.ParseUint(l[1], 16, 64); err != nil { return nil, fmt.Errorf("cannot parse local_address port value in socket line: %w", err) } // remote_address r := strings.Split(fields[2], ":") if len(r) != 2 { return nil, fmt.Errorf("cannot parse rem_address field in socket line %q", fields[1]) } if line.RemAddr, err = parseIP(r[0]); err != nil { return nil, err } if line.RemPort, err = strconv.ParseUint(r[1], 16, 64); err != nil { return nil, fmt.Errorf("cannot parse rem_address port value in socket line: %w", err) } // st if line.St, err = strconv.ParseUint(fields[3], 16, 64); err != nil { return nil, fmt.Errorf("cannot parse st value in socket line: %w", err) } // tx_queue and rx_queue q := strings.Split(fields[4], ":") if len(q) != 2 { return nil, fmt.Errorf( "cannot parse tx/rx queues in socket line as it has a missing colon %q", fields[4], ) } if line.TxQueue, err = strconv.ParseUint(q[0], 16, 64); err != nil { return nil, fmt.Errorf("cannot parse tx_queue value in socket line: %w", err) } if line.RxQueue, err = strconv.ParseUint(q[1], 16, 64); err != nil { return nil, fmt.Errorf("cannot parse rx_queue value in socket line: %w", err) } // uid if line.UID, err = strconv.ParseUint(fields[7], 0, 64); err != nil { return nil, fmt.Errorf("cannot parse uid value in socket line: %w", err) } // inode if line.Inode, err = strconv.ParseUint(fields[9], 0, 64); err != nil { return nil, fmt.Errorf("cannot parse inode value in socket line: %w", err) } return line, nil } func parseIP(hexIP string) (net.IP, error) { var byteIP []byte byteIP, err := hex.DecodeString(hexIP) if err != nil { return nil, fmt.Errorf("cannot parse address field in socket line %q", hexIP) } switch len(byteIP) { case 4: return net.IP{byteIP[3], byteIP[2], byteIP[1], byteIP[0]}, nil case 16: i := net.IP{ byteIP[3], byteIP[2], byteIP[1], byteIP[0], byteIP[7], byteIP[6], byteIP[5], byteIP[4], byteIP[11], byteIP[10], byteIP[9], byteIP[8], byteIP[15], byteIP[14], byteIP[13], byteIP[12], } return i, nil default: return nil, fmt.Errorf("unable to parse IP %s", hexIP) } }