metricbeat/module/system/socket/socket.go (281 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux
package socket
import (
"fmt"
"net"
"os"
"os/user"
"strconv"
"syscall"
sock "github.com/elastic/beats/v7/metricbeat/helper/socket"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"github.com/elastic/gosigar/sys/linux"
)
var (
debugSelector = "system.socket"
)
func init() {
mb.Registry.MustAddMetricSet("system", "socket", New,
mb.WithHostParser(parse.EmptyHostParser),
)
}
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
netlink *sock.NetlinkSession
ptable *sock.ProcTable
euid int
previousConns hashSet
currentConns hashSet
reverseLookup *ReverseLookupCache
listeners *sock.ListenerTable
users UserCache
}
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
sys := base.Module().(resolve.Resolver)
c := defaultConfig
if err := base.Module().UnpackConfig(&c); err != nil {
return nil, err
}
ptable, err := sock.NewProcTable(sys.ResolveHostFS("/proc"))
if err != nil {
return nil, err
}
if !ptable.Privileged() {
logp.Info("socket process info will only be available for processes owned by the %v user "+
"because this Beat is not running with enough privileges", os.Geteuid())
}
m := &MetricSet{
BaseMetricSet: base,
netlink: sock.NewNetlinkSession(),
ptable: ptable,
euid: os.Geteuid(),
previousConns: hashSet{},
currentConns: hashSet{},
listeners: sock.NewListenerTable(),
users: NewUserCache(),
}
if c.ReverseLookup.IsEnabled() {
var successTTL, failureTTL = defSuccessTTL, defFailureTTL
if c.ReverseLookup.SuccessTTL != 0 {
successTTL = c.ReverseLookup.SuccessTTL
}
if c.ReverseLookup.FailureTTL != 0 {
successTTL = c.ReverseLookup.FailureTTL
}
base.Logger().Named(debugSelector).Debugf("enabled reverse DNS lookup with cache TTL of %v/%v",
successTTL, failureTTL)
m.reverseLookup = NewReverseLookupCache(successTTL, failureTTL)
}
return m, nil
}
// Fetch socket metrics from the system
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// Refresh inode to process mapping (must be root).
if err := m.ptable.Refresh(); err != nil {
m.Logger().Named(debugSelector).Debugf("process table refresh had failures: %v", err)
}
sockets, err := m.netlink.GetSocketList()
if err != nil {
return fmt.Errorf("failed requesting socket dump: %w", err)
}
m.Logger().Named(debugSelector).Debugf("netlink returned %d sockets", len(sockets))
// Filter sockets that were known during the previous poll.
sockets = m.filterAndRememberSockets(sockets)
// Enrich sockets with direction/pid/process/user/hostname and convert to MapStr.
for _, s := range sockets {
c := newConnection(s)
m.enrichConnectionData(c)
root, metricSet := c.ToMapStr()
isOpen := r.Event(mb.Event{
RootFields: root,
MetricSetFields: metricSet,
})
if !isOpen {
return nil
}
}
// Set the "previous" connections set to the "current" connections.
tmp := m.previousConns
m.previousConns = m.currentConns
m.currentConns = tmp.Reset()
// Reset the listeners for the next iteration.
m.listeners.Reset()
return nil
}
// filterAndRememberSockets filters sockets to remove sockets that were seen
// during the last poll. It stores all of the sockets it sees for the next
// poll.
func (m *MetricSet) filterAndRememberSockets(sockets ...[]*linux.InetDiagMsg) []*linux.InetDiagMsg {
var newSockets []*linux.InetDiagMsg
for _, list := range sockets {
for _, socket := range list {
// Register all listening sockets.
if socket.DstPort() == 0 {
m.listeners.Put(uint8(syscall.IPPROTO_TCP), socket.SrcIP(), socket.SrcPort())
}
// Filter known sockets.
if m.isNewSocket(socket) {
m.Logger().Named(debugSelector).Debugf("found new socket %v:%v -> %v:%v with state=%v, inode=%v, hash-id=%d",
socket.SrcIP(), socket.SrcPort(),
socket.DstIP(), socket.DstPort(),
linux.TCPState(socket.State), socket.Inode, socket.FastHash())
newSockets = append(newSockets, socket)
}
}
}
return newSockets
}
// isNewSocket returns true if the socket is new since the last poll.
func (m *MetricSet) isNewSocket(diag *linux.InetDiagMsg) bool {
// Don't use the socket's inode for deduplication because once the socket
// is closing the inode goes to 0.
key := diag.FastHash()
m.currentConns.Add(key)
return !m.previousConns.Contains(key)
}
// enrichConnectionData enriches the connection with username, direction,
// hostname of the remote IP (if enabled), eTLD + 1 of the hostname, and the
// process owning the socket.
func (m *MetricSet) enrichConnectionData(c *connection) {
c.User = m.users.LookupUID(int(c.UID))
// Determine direction (incoming, outgoing, or listening).
c.Direction = m.listeners.Direction(uint8(c.Family), uint8(syscall.IPPROTO_TCP),
c.LocalIP, c.LocalPort, c.RemoteIP, c.RemotePort)
// Reverse DNS lookup on the remote IP.
if m.reverseLookup != nil && c.Direction != sock.Listening {
hostname, err := m.reverseLookup.Lookup(c.RemoteIP)
if err != nil {
c.DestHostError = err
} else {
c.DestHost = hostname
c.DestHostETLDPlusOne, _ = etldPlusOne(hostname)
}
}
// Add process info by finding the process that holds the socket's inode.
if proc := m.ptable.ProcessBySocketInode(c.Inode); proc != nil {
c.PID = proc.PID
c.Exe = proc.Executable
c.Command = proc.Command
c.CmdLine = proc.CmdLine
c.Args = proc.Args
} else if m.ptable.Privileged() {
if c.Inode == 0 {
c.ProcessError = fmt.Errorf("process has exited. inode=%v, tcp_state=%v",
c.Inode, c.State)
} else {
c.ProcessError = fmt.Errorf("process not found. inode=%v, tcp_state=%v",
c.Inode, c.State)
}
}
}
type connection struct {
Family linux.AddressFamily
LocalIP net.IP
LocalPort int
RemoteIP net.IP
RemotePort int
State linux.TCPState
Direction sock.Direction
DestHost string // Reverse lookup of dest IP.
DestHostETLDPlusOne string
DestHostError error // Resolver error.
// Process identifiers.
Inode uint32 // Inode of the socket.
PID int // PID of the socket owner.
Exe string // Absolute path to the executable.
Command string // Command
CmdLine string // Full command line with arguments.
Args []string // Raw arguments
ProcessError error // Reason process info is unavailable.
// User identifiers.
UID uint32 // UID of the socket owner.
User *user.User // Owner of the socket.
}
func newConnection(diag *linux.InetDiagMsg) *connection {
return &connection{
Family: linux.AddressFamily(diag.Family),
State: linux.TCPState(diag.State),
LocalIP: diag.SrcIP(),
LocalPort: diag.SrcPort(),
RemoteIP: diag.DstIP(),
RemotePort: diag.DstPort(),
Inode: diag.Inode,
UID: diag.UID,
PID: -1,
}
}
// Map helpers for conversion to event
var (
ianaNumbersMap = map[string]string{
"ipv4": "4",
"ipv6": "41",
}
localHostInfoGroup = map[string]string{
sock.IngressName: "destination",
sock.EgressName: "source",
sock.ListeningName: "server",
}
remoteHostInfoGroup = map[string]string{
sock.IngressName: "source",
sock.EgressName: "destination",
}
)
func (c *connection) ToMapStr() (fields mapstr.M, metricSetFields mapstr.M) {
localGroup := "server"
if g, ok := localHostInfoGroup[c.Direction.String()]; ok {
localGroup = g
}
fields = mapstr.M{
"network": mapstr.M{
"type": c.Family.String(),
"iana_number": ianaNumbersMap[c.Family.String()],
"direction": c.Direction.String(),
},
"user": mapstr.M{
"id": strconv.Itoa(int(c.UID)),
},
// Aliases for this are not going to be possible, keeping
// duplicated fields by now for backwards comatibility
localGroup: mapstr.M{
"ip": c.LocalIP.String(),
"port": c.LocalPort,
},
}
metricSetFields = mapstr.M{
"local": mapstr.M{
"ip": c.LocalIP.String(),
"port": c.LocalPort,
},
}
if c.User.Username != "" {
fields.Put("user.name", c.User.Username)
}
if c.User.Name != "" {
fields.Put("user.full_name", c.User.Name)
}
if c.ProcessError != nil {
fields.Put("error.code", c.ProcessError.Error())
} else {
process := mapstr.M{"pid": c.PID}
if c.PID > 0 {
addOptionalString(process, "executable", c.Exe)
addOptionalString(process, "name", c.Command)
if len(c.Args) >= 0 {
process["args"] = c.Args
metricSetFields["process"] = mapstr.M{
"cmdline": c.CmdLine,
}
}
} else if c.PID == 0 {
process["command"] = "kernel"
}
if c.PID >= 0 {
fields["process"] = process
}
}
if c.RemotePort != 0 {
// Aliases for this are not going to be possible, keeping
// duplicated fields by now for backwards comatibility
remote := mapstr.M{
"ip": c.RemoteIP.String(),
"port": c.RemotePort,
}
if c.DestHostError != nil {
remote["host_error"] = c.DestHostError.Error()
} else {
addOptionalString(remote, "host", c.DestHost)
addOptionalString(remote, "etld_plus_one", c.DestHostETLDPlusOne)
}
metricSetFields["remote"] = remote
remoteGroup, ok := remoteHostInfoGroup[c.Direction.String()]
if ok {
fields[remoteGroup] = mapstr.M{
"ip": c.RemoteIP.String(),
"port": c.RemotePort,
}
}
}
return fields, metricSetFields
}
func addOptionalString(m mapstr.M, key, value string) {
if value == "" {
return
}
m[key] = value
}