func()

in pkg/accesslog/collector/ztunnel.go [67:110]


func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx *common.AccessLogContext) error {
	z.ctx, z.cancel = context.WithCancel(ctx.RuntimeContext)
	z.alc = ctx
	ctx.ConnectionMgr.RegisterNewFlushListener(z)

	err := z.findZTunnelProcessAndCollect()
	if err != nil {
		return err
	}

	ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, func(data interface{}) {
		event := data.(*events.ZTunnelSocketMappingEvent)
		localIP := z.convertBPFIPToString(event.OriginalSrcIP)
		localPort := event.OriginalSrcPort
		remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
		remotePort := event.OriginalDestPort
		lbIP := z.convertBPFIPToString(event.LoadBalancedDestIP)
		log.Debugf("received ztunnel lb socket mapping event: %s:%d -> %s:%d, lb: %s", localIP, localPort, remoteIP, remotePort, lbIP)

		key := z.buildIPMappingCacheKey(localIP, int(localPort), remoteIP, int(remotePort))
		z.ipMappingCache.Set(key, &ZTunnelLoadBalanceAddress{
			IP:   lbIP,
			Port: event.LoadBalancedDestPort,
		}, z.ipMappingExpireDuration)
	}, func() interface{} {
		return &events.ZTunnelSocketMappingEvent{}
	})
	go func() {
		ticker := time.NewTicker(ZTunnelProcessFinderInterval)
		for {
			select {
			case <-ticker.C:
				err := z.findZTunnelProcessAndCollect()
				if err != nil {
					log.Error("failed to find and collect ztunnel process: ", err)
				}
			case <-z.ctx.Done():
				ticker.Stop()
				return
			}
		}
	}()
	return nil
}