func()

in pkg/accesslog/collector/connection.go [61:124]


func (c *ConnectCollector) Start(m *module.Manager, ctx *common.AccessLogContext) error {
	perCPUBufferSize, err := units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize)
	if err != nil {
		return err
	}
	if int(perCPUBufferSize) < os.Getpagesize() {
		return fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize())
	}
	if ctx.Config.ConnectionAnalyze.ParseParallels < 1 {
		return fmt.Errorf("the parallels cannot be small than 1")
	}
	if ctx.Config.ConnectionAnalyze.AnalyzeParallels < 1 {
		return fmt.Errorf("the parallels cannot be small than 1")
	}
	if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
		return fmt.Errorf("the queue size be small than 1")
	}
	c.eventQueue = btf.NewEventQueue("connection resolver", ctx.Config.ConnectionAnalyze.AnalyzeParallels,
		ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
			return NewConnectionPartitionContext(ctx, m.FindModule(process.ModuleName).(process.K8sOperator))
		})
	c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize),
		ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} {
			return &events.SocketConnectEvent{}
		}, func(data interface{}) int {
			return int(data.(*events.SocketConnectEvent).ConID)
		})
	c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} {
		return &events.SocketCloseEvent{}
	}, func(data interface{}) int {
		return int(data.(*events.SocketCloseEvent).ConnectionID)
	})
	c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)

	ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect", ctx.BPF.TracepointEnterConnect)
	ctx.BPF.AddTracePoint("syscalls", "sys_exit_connect", ctx.BPF.TracepointExitConnect)
	ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept", ctx.BPF.TracepointEnterAccept)
	ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept", ctx.BPF.TracepointExitAccept)
	ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept4", ctx.BPF.TracepointEnterAccept)
	ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept4", ctx.BPF.TracepointExitAccept)
	ctx.BPF.AddTracePoint("syscalls", "sys_enter_close", ctx.BPF.TracepointEnterClose)
	ctx.BPF.AddTracePoint("syscalls", "sys_exit_close", ctx.BPF.TracepointExitClose)

	ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
		"tcp_connect": ctx.BPF.TcpConnect,
	})
	ctx.BPF.AddLink(link.Kretprobe, map[string]*ebpf.Program{
		"sock_alloc": ctx.BPF.SockAllocRet,
	})
	ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
		"ip4_datagram_connect": ctx.BPF.Ip4UdpDatagramConnect,
	})

	_ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
		"__nf_conntrack_hash_insert": ctx.BPF.NfConntrackHashInsert,
	})
	_ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
		"nf_confirm": ctx.BPF.NfConfirm,
	})
	_ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
		"ctnetlink_fill_info": ctx.BPF.NfCtnetlinkFillInfo,
	})
	return nil
}