psnotify/psnotify_linux.go (180 lines of code) (raw):
// Copyright (c) 2012 VMware, Inc.
// Go interface to the Linux netlink process connector.
// See Documentation/connector/connector.txt in the linux kernel source tree.
package psnotify
import (
"bytes"
"encoding/binary"
"os"
"syscall"
"github.com/elastic/gosigar/sys"
)
const (
// internal flags (from <linux/connector.h>)
_CN_IDX_PROC = 0x1
_CN_VAL_PROC = 0x1
// internal flags (from <linux/cn_proc.h>)
_PROC_CN_MCAST_LISTEN = 1
_PROC_CN_MCAST_IGNORE = 2
// Flags (from <linux/cn_proc.h>)
PROC_EVENT_FORK = 0x00000001 // fork() events
PROC_EVENT_EXEC = 0x00000002 // exec() events
PROC_EVENT_EXIT = 0x80000000 // exit() events
// Watch for all process events
PROC_EVENT_ALL = PROC_EVENT_FORK | PROC_EVENT_EXEC | PROC_EVENT_EXIT
)
var (
byteOrder = sys.GetEndian()
)
// linux/connector.h: struct cb_id
type cbId struct {
Idx uint32
Val uint32
}
// linux/connector.h: struct cb_msg
type cnMsg struct {
Id cbId
Seq uint32
Ack uint32
Len uint16
Flags uint16
}
// linux/cn_proc.h: struct proc_event.{what,cpu,timestamp_ns}
type procEventHeader struct {
What uint32
Cpu uint32
Timestamp uint64
}
// linux/cn_proc.h: struct proc_event.fork
type forkProcEvent struct {
ParentPid uint32
ParentTgid uint32
ChildPid uint32
ChildTgid uint32
}
// linux/cn_proc.h: struct proc_event.exec
type execProcEvent struct {
ProcessPid uint32
ProcessTgid uint32
}
// linux/cn_proc.h: struct proc_event.exit
type exitProcEvent struct {
ProcessPid uint32
ProcessTgid uint32
ExitCode uint32
ExitSignal uint32
}
// standard netlink header + connector header
type netlinkProcMessage struct {
Header syscall.NlMsghdr
Data cnMsg
}
type netlinkListener struct {
addr *syscall.SockaddrNetlink // Netlink socket address
sock int // The syscall.Socket() file descriptor
seq uint32 // struct cn_msg.seq
}
// Initialize linux implementation of the eventListener interface
func createListener() (eventListener, error) {
listener := &netlinkListener{}
err := listener.bind()
return listener, err
}
// noop on linux
func (w *Watcher) unregister(pid int) error {
return nil
}
// noop on linux
func (w *Watcher) register(pid int, flags uint32) error {
return nil
}
// Read events from the netlink socket
func (w *Watcher) readEvents() {
buf := make([]byte, syscall.Getpagesize())
listener, _ := w.listener.(*netlinkListener)
for {
if w.isDone() {
return
}
nr, _, err := syscall.Recvfrom(listener.sock, buf, 0)
if err != nil {
w.Error <- err
continue
}
if nr < syscall.NLMSG_HDRLEN {
w.Error <- syscall.EINVAL
continue
}
msgs, _ := syscall.ParseNetlinkMessage(buf[:nr])
for _, m := range msgs {
if m.Header.Type == syscall.NLMSG_DONE {
w.handleEvent(m.Data)
}
}
}
}
// Internal helper to check if pid && event is being watched
func (w *Watcher) isWatching(pid int, event uint32) bool {
if watch, ok := w.watches[pid]; ok {
return (watch.flags & event) == event
}
return false
}
// Dispatch events from the netlink socket to the Event channels.
// Unlike bsd kqueue, netlink receives events for all pids,
// so we apply filtering based on the watch table via isWatching()
func (w *Watcher) handleEvent(data []byte) {
buf := bytes.NewBuffer(data)
msg := &cnMsg{}
hdr := &procEventHeader{}
binary.Read(buf, byteOrder, msg)
binary.Read(buf, byteOrder, hdr)
switch hdr.What {
case PROC_EVENT_FORK:
event := &forkProcEvent{}
binary.Read(buf, byteOrder, event)
ppid := int(event.ParentTgid)
pid := int(event.ChildTgid)
if w.isWatching(ppid, PROC_EVENT_EXEC) {
// follow forks
watch, _ := w.watches[ppid]
w.Watch(pid, watch.flags)
}
if w.isWatching(ppid, PROC_EVENT_FORK) {
w.Fork <- &ProcEventFork{ParentPid: ppid, ChildPid: pid}
}
case PROC_EVENT_EXEC:
event := &execProcEvent{}
binary.Read(buf, byteOrder, event)
pid := int(event.ProcessTgid)
if w.isWatching(pid, PROC_EVENT_EXEC) {
w.Exec <- &ProcEventExec{Pid: pid}
}
case PROC_EVENT_EXIT:
event := &exitProcEvent{}
binary.Read(buf, byteOrder, event)
pid := int(event.ProcessTgid)
if w.isWatching(pid, PROC_EVENT_EXIT) {
w.RemoveWatch(pid)
w.Exit <- &ProcEventExit{Pid: pid}
}
}
}
// Bind our netlink socket and
// send a listen control message to the connector driver.
func (listener *netlinkListener) bind() error {
sock, err := syscall.Socket(
syscall.AF_NETLINK,
syscall.SOCK_DGRAM,
syscall.NETLINK_CONNECTOR)
if err != nil {
return err
}
listener.sock = sock
listener.addr = &syscall.SockaddrNetlink{
Family: syscall.AF_NETLINK,
Groups: _CN_IDX_PROC,
}
err = syscall.Bind(listener.sock, listener.addr)
if err != nil {
return err
}
return listener.send(_PROC_CN_MCAST_LISTEN)
}
// Send an ignore control message to the connector driver
// and close our netlink socket.
func (listener *netlinkListener) close() error {
err := listener.send(_PROC_CN_MCAST_IGNORE)
syscall.Close(listener.sock)
return err
}
// Generic method for sending control messages to the connector
// driver; where op is one of PROC_CN_MCAST_{LISTEN,IGNORE}
func (listener *netlinkListener) send(op uint32) error {
listener.seq++
pr := &netlinkProcMessage{}
plen := binary.Size(pr.Data) + binary.Size(op)
pr.Header.Len = syscall.NLMSG_HDRLEN + uint32(plen)
pr.Header.Type = uint16(syscall.NLMSG_DONE)
pr.Header.Flags = 0
pr.Header.Seq = listener.seq
pr.Header.Pid = uint32(os.Getpid())
pr.Data.Id.Idx = _CN_IDX_PROC
pr.Data.Id.Val = _CN_VAL_PROC
pr.Data.Len = uint16(binary.Size(op))
buf := bytes.NewBuffer(make([]byte, 0, pr.Header.Len))
binary.Write(buf, byteOrder, pr)
binary.Write(buf, byteOrder, op)
return syscall.Sendto(listener.sock, buf.Bytes(), 0, listener.addr)
}