in pkg/accesslog/collector/connection.go [61:124]
func (c *ConnectCollector) Start(m *module.Manager, ctx *common.AccessLogContext) error {
perCPUBufferSize, err := units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize)
if err != nil {
return err
}
if int(perCPUBufferSize) < os.Getpagesize() {
return fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize())
}
if ctx.Config.ConnectionAnalyze.ParseParallels < 1 {
return fmt.Errorf("the parallels cannot be small than 1")
}
if ctx.Config.ConnectionAnalyze.AnalyzeParallels < 1 {
return fmt.Errorf("the parallels cannot be small than 1")
}
if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
return fmt.Errorf("the queue size be small than 1")
}
c.eventQueue = btf.NewEventQueue("connection resolver", ctx.Config.ConnectionAnalyze.AnalyzeParallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
return NewConnectionPartitionContext(ctx, m.FindModule(process.ModuleName).(process.K8sOperator))
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize),
ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} {
return &events.SocketConnectEvent{}
}, func(data interface{}) int {
return int(data.(*events.SocketConnectEvent).ConID)
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} {
return &events.SocketCloseEvent{}
}, func(data interface{}) int {
return int(data.(*events.SocketCloseEvent).ConnectionID)
})
c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect", ctx.BPF.TracepointEnterConnect)
ctx.BPF.AddTracePoint("syscalls", "sys_exit_connect", ctx.BPF.TracepointExitConnect)
ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept", ctx.BPF.TracepointEnterAccept)
ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept", ctx.BPF.TracepointExitAccept)
ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept4", ctx.BPF.TracepointEnterAccept)
ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept4", ctx.BPF.TracepointExitAccept)
ctx.BPF.AddTracePoint("syscalls", "sys_enter_close", ctx.BPF.TracepointEnterClose)
ctx.BPF.AddTracePoint("syscalls", "sys_exit_close", ctx.BPF.TracepointExitClose)
ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
"tcp_connect": ctx.BPF.TcpConnect,
})
ctx.BPF.AddLink(link.Kretprobe, map[string]*ebpf.Program{
"sock_alloc": ctx.BPF.SockAllocRet,
})
ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
"ip4_datagram_connect": ctx.BPF.Ip4UdpDatagramConnect,
})
_ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
"__nf_conntrack_hash_insert": ctx.BPF.NfConntrackHashInsert,
})
_ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
"nf_confirm": ctx.BPF.NfConfirm,
})
_ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
"ctnetlink_fill_info": ctx.BPF.NfCtnetlinkFillInfo,
})
return nil
}