filebeat/input/netmetrics/tcp.go (188 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package netmetrics import ( "bytes" "errors" "fmt" "os" "runtime" "strconv" "time" "github.com/rcrowley/go-metrics" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/adapter" ) // TCP handles the TCP metric reporting. type TCP struct { unregister func() done chan struct{} monitorRegistry *monitoring.Registry lastPacket time.Time device *monitoring.String // name of the device being monitored packets *monitoring.Uint // number of packets processed bytes *monitoring.Uint // number of bytes processed rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp{,6} (only on linux systems) arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication } // NewTCP returns a new TCP input metricset. Note that if the id is empty then a nil TCP metricset is returned. func NewTCP(inputName string, id string, device string, poll time.Duration, log *logp.Logger) *TCP { if id == "" { return nil } reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) out := &TCP{ unregister: unreg, monitorRegistry: reg, device: monitoring.NewString(reg, "device"), packets: monitoring.NewUint(reg, "received_events_total"), bytes: monitoring.NewUint(reg, "received_bytes_total"), rxQueue: monitoring.NewUint(reg, "receive_queue_length"), arrivalPeriod: metrics.NewUniformSample(1024), processingTime: metrics.NewUniformSample(1024), } _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.processingTime)) out.device.Set(device) if poll > 0 && runtime.GOOS == "linux" { addr4, addr6, err := addrs(device, log) if err != nil { log.Warn(err) return out } out.done = make(chan struct{}) go out.poll(addr4, addr6, poll, log) } return out } // Log logs metric for the given packet. func (m *TCP) Log(data []byte, timestamp time.Time) { if m == nil { return } m.processingTime.Update(time.Since(timestamp).Nanoseconds()) m.packets.Add(1) m.bytes.Add(uint64(len(data))) if !m.lastPacket.IsZero() { m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) } m.lastPacket = timestamp } // poll periodically gets TCP buffer stats from the OS. func (m *TCP) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) { hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) if badAddr != nil { log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr) } hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6) if badAddr != nil { log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr) } // Do an initial check for access to the filesystem and of the // value constructed by containsUnspecifiedAddr. This gives a // base level for the rx_queue values and ensures that if the // constructed address values are malformed we panic early // within the period of system testing. want4 := true rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) if err != nil { want4 = false log.Infof("did not get initial tcp stats from /proc: %v", err) } want6 := true rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) if err != nil { want6 = false log.Infof("did not get initial tcp6 stats from /proc: %v", err) } if !want4 && !want6 { log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err) } else { m.rxQueue.Set(uint64(rx + rx6)) } t := time.NewTicker(each) for { select { case <-t.C: var found bool rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) if err != nil { if want4 { log.Warnf("failed to get tcp stats from /proc: %v", err) } } else { found = true want4 = true } rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) if err != nil { if want6 { log.Warnf("failed to get tcp6 stats from /proc: %v", err) } } else { found = true want6 = true } if found { m.rxQueue.Set(uint64(rx + rx6)) } case <-m.done: t.Stop() return } } } // Registry returns the monitoring registry of the TCP metricset. func (m *TCP) Registry() *monitoring.Registry { if m == nil { return nil } return m.monitorRegistry } // procNetTCP returns the rx_queue field of the TCP socket table for the // socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6 // equivalent. // This function is only useful on linux due to its dependence on the /proc // filesystem, but is kept in this file for simplicity. If hasUnspecified // is true, all addresses listed in the file in path are considered, and the // sum of rx_queue matching the addr ports is returned where the corresponding // addrIsUnspecified is true. func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) { if len(addr) == 0 { return 0, nil } if len(addr) != len(addrIsUnspecified) { return 0, errors.New("mismatched address/unspecified lists: please report this") } b, err := os.ReadFile(path) if err != nil { return 0, err } lines := bytes.Split(b, []byte("\n")) if len(lines) < 2 { return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) } var found bool for _, l := range lines[1:] { f := bytes.Fields(l) const queuesField = 4 if len(f) > queuesField && contains(f[1], addr, addrIsUnspecified) { _, r, ok := bytes.Cut(f[4], []byte(":")) if !ok { return 0, errors.New("no rx_queue field " + string(f[queuesField])) } found = true // queue lengths are hex, e.g.: // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643 // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987 v, err := strconv.ParseInt(string(r), 16, 64) if err != nil { return 0, fmt.Errorf("failed to parse rx_queue: %w", err) } rx += v if hasUnspecified { continue } return rx, nil } } if found { return rx, nil } return 0, fmt.Errorf("%s entry not found for %s", path, addr) } // Close closes the TCP metricset and unregister the metrics. func (m *TCP) Close() { if m == nil { return } if m.done != nil { // Shut down poller and wait until done before unregistering metrics. m.done <- struct{}{} } if m.unregister != nil { m.unregister() m.unregister = nil } m.monitorRegistry = nil }