receiver/hostmetricsreceiver/internal/scraper/networkscraper/network_scraper.go (163 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package networkscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/networkscraper" import ( "context" "fmt" "time" "github.com/shirou/gopsutil/v4/host" "github.com/shirou/gopsutil/v4/net" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/scraper" "go.opentelemetry.io/collector/scraper/scrapererror" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/networkscraper/internal/metadata" ) const ( networkMetricsLen = 4 connectionsMetricsLen = 1 ) // scraper for Network Metrics type networkScraper struct { settings scraper.Settings config *Config mb *metadata.MetricsBuilder startTime pcommon.Timestamp includeFS filterset.FilterSet excludeFS filterset.FilterSet // for mocking bootTime func(context.Context) (uint64, error) ioCounters func(context.Context, bool) ([]net.IOCountersStat, error) connections func(context.Context, string) ([]net.ConnectionStat, error) conntrack func(context.Context) ([]net.FilterStat, error) } // newNetworkScraper creates a set of Network related metrics func newNetworkScraper(_ context.Context, settings scraper.Settings, cfg *Config) (*networkScraper, error) { scraper := &networkScraper{ settings: settings, config: cfg, bootTime: host.BootTimeWithContext, ioCounters: net.IOCountersWithContext, connections: net.ConnectionsWithContext, conntrack: net.FilterCountersWithContext, } var err error if len(cfg.Include.Interfaces) > 0 { scraper.includeFS, err = filterset.CreateFilterSet(cfg.Include.Interfaces, &cfg.Include.Config) if err != nil { return nil, fmt.Errorf("error creating network interface include filters: %w", err) } } if len(cfg.Exclude.Interfaces) > 0 { scraper.excludeFS, err = filterset.CreateFilterSet(cfg.Exclude.Interfaces, &cfg.Exclude.Config) if err != nil { return nil, fmt.Errorf("error creating network interface exclude filters: %w", err) } } return scraper, nil } func (s *networkScraper) start(ctx context.Context, _ component.Host) error { bootTime, err := s.bootTime(ctx) if err != nil { return err } s.startTime = pcommon.Timestamp(bootTime * 1e9) s.mb = metadata.NewMetricsBuilder(s.config.MetricsBuilderConfig, s.settings, metadata.WithStartTime(pcommon.Timestamp(bootTime*1e9))) return nil } func (s *networkScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { var errors scrapererror.ScrapeErrors err := s.recordNetworkCounterMetrics(ctx) if err != nil { errors.AddPartial(networkMetricsLen, err) } err = s.recordNetworkConnectionsMetrics(ctx) if err != nil { errors.AddPartial(connectionsMetricsLen, err) } err = s.recordNetworkConntrackMetrics(ctx) if err != nil { errors.AddPartial(connectionsMetricsLen, err) } return s.mb.Emit(), errors.Combine() } func (s *networkScraper) recordNetworkCounterMetrics(ctx context.Context) error { now := pcommon.NewTimestampFromTime(time.Now()) // get total stats only ioCounters, err := s.ioCounters(ctx, true /*perNetworkInterfaceController=*/) if err != nil { return fmt.Errorf("failed to read network IO stats: %w", err) } // filter network interfaces by name ioCounters = s.filterByInterface(ioCounters) if len(ioCounters) > 0 { s.recordNetworkPacketsMetric(now, ioCounters) s.recordNetworkDroppedPacketsMetric(now, ioCounters) s.recordNetworkErrorPacketsMetric(now, ioCounters) s.recordNetworkIOMetric(now, ioCounters) } return nil } func (s *networkScraper) recordNetworkPacketsMetric(now pcommon.Timestamp, ioCountersSlice []net.IOCountersStat) { for _, ioCounters := range ioCountersSlice { s.mb.RecordSystemNetworkPacketsDataPoint(now, int64(ioCounters.PacketsSent), ioCounters.Name, metadata.AttributeDirectionTransmit) s.mb.RecordSystemNetworkPacketsDataPoint(now, int64(ioCounters.PacketsRecv), ioCounters.Name, metadata.AttributeDirectionReceive) } } func (s *networkScraper) recordNetworkDroppedPacketsMetric(now pcommon.Timestamp, ioCountersSlice []net.IOCountersStat) { for _, ioCounters := range ioCountersSlice { s.mb.RecordSystemNetworkDroppedDataPoint(now, int64(ioCounters.Dropout), ioCounters.Name, metadata.AttributeDirectionTransmit) s.mb.RecordSystemNetworkDroppedDataPoint(now, int64(ioCounters.Dropin), ioCounters.Name, metadata.AttributeDirectionReceive) } } func (s *networkScraper) recordNetworkErrorPacketsMetric(now pcommon.Timestamp, ioCountersSlice []net.IOCountersStat) { for _, ioCounters := range ioCountersSlice { s.mb.RecordSystemNetworkErrorsDataPoint(now, int64(ioCounters.Errout), ioCounters.Name, metadata.AttributeDirectionTransmit) s.mb.RecordSystemNetworkErrorsDataPoint(now, int64(ioCounters.Errin), ioCounters.Name, metadata.AttributeDirectionReceive) } } func (s *networkScraper) recordNetworkIOMetric(now pcommon.Timestamp, ioCountersSlice []net.IOCountersStat) { for _, ioCounters := range ioCountersSlice { s.mb.RecordSystemNetworkIoDataPoint(now, int64(ioCounters.BytesSent), ioCounters.Name, metadata.AttributeDirectionTransmit) s.mb.RecordSystemNetworkIoDataPoint(now, int64(ioCounters.BytesRecv), ioCounters.Name, metadata.AttributeDirectionReceive) } } func (s *networkScraper) recordNetworkConnectionsMetrics(ctx context.Context) error { if !s.config.Metrics.SystemNetworkConnections.Enabled { return nil } now := pcommon.NewTimestampFromTime(time.Now()) connections, err := s.connections(ctx, "tcp") if err != nil { return fmt.Errorf("failed to read TCP connections: %w", err) } tcpConnectionStatusCounts := getTCPConnectionStatusCounts(connections) s.recordNetworkConnectionsMetric(now, tcpConnectionStatusCounts) return nil } func getTCPConnectionStatusCounts(connections []net.ConnectionStat) map[string]int64 { tcpStatuses := make(map[string]int64, len(allTCPStates)) for _, state := range allTCPStates { tcpStatuses[state] = 0 } for _, connection := range connections { tcpStatuses[connection.Status]++ } return tcpStatuses } func (s *networkScraper) recordNetworkConnectionsMetric(now pcommon.Timestamp, connectionStateCounts map[string]int64) { for connectionState, count := range connectionStateCounts { s.mb.RecordSystemNetworkConnectionsDataPoint(now, count, metadata.AttributeProtocolTcp, connectionState) } } func (s *networkScraper) filterByInterface(ioCounters []net.IOCountersStat) []net.IOCountersStat { if s.includeFS == nil && s.excludeFS == nil { return ioCounters } filteredIOCounters := make([]net.IOCountersStat, 0, len(ioCounters)) for _, io := range ioCounters { if s.includeInterface(io.Name) { filteredIOCounters = append(filteredIOCounters, io) } } return filteredIOCounters } func (s *networkScraper) includeInterface(interfaceName string) bool { return (s.includeFS == nil || s.includeFS.Matches(interfaceName)) && (s.excludeFS == nil || !s.excludeFS.Matches(interfaceName)) }