in pkg/accesslog/collector/ztunnel.go [67:110]
func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx *common.AccessLogContext) error {
z.ctx, z.cancel = context.WithCancel(ctx.RuntimeContext)
z.alc = ctx
ctx.ConnectionMgr.RegisterNewFlushListener(z)
err := z.findZTunnelProcessAndCollect()
if err != nil {
return err
}
ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, func(data interface{}) {
event := data.(*events.ZTunnelSocketMappingEvent)
localIP := z.convertBPFIPToString(event.OriginalSrcIP)
localPort := event.OriginalSrcPort
remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
remotePort := event.OriginalDestPort
lbIP := z.convertBPFIPToString(event.LoadBalancedDestIP)
log.Debugf("received ztunnel lb socket mapping event: %s:%d -> %s:%d, lb: %s", localIP, localPort, remoteIP, remotePort, lbIP)
key := z.buildIPMappingCacheKey(localIP, int(localPort), remoteIP, int(remotePort))
z.ipMappingCache.Set(key, &ZTunnelLoadBalanceAddress{
IP: lbIP,
Port: event.LoadBalancedDestPort,
}, z.ipMappingExpireDuration)
}, func() interface{} {
return &events.ZTunnelSocketMappingEvent{}
})
go func() {
ticker := time.NewTicker(ZTunnelProcessFinderInterval)
for {
select {
case <-ticker.C:
err := z.findZTunnelProcessAndCollect()
if err != nil {
log.Error("failed to find and collect ztunnel process: ", err)
}
case <-z.ctx.Done():
ticker.Stop()
return
}
}
}()
return nil
}