func()

in pkg/ebpf/conntrack/conntrack_client.go [53:193]


func (c *conntrackClient) CleanupConntrackMap() {
	c.logger.Info("Check for any stale entries in the conntrack map")
	bpfMapApi := &goebpfmaps.BpfMap{}
	mapInfo, err := bpfMapApi.GetMapFromPinPath(CONNTRACK_MAP_PIN_PATH)
	if err != nil {
		c.logger.Info("Failed to get mapInfo for conntrack pinpath")
		return
	}
	mapID := int(mapInfo.Id)

	// Read from eBPF Table if local conntrack table is not cached
	if c.hydratelocalConntrack {
		//Lets cleanup all entries in cache
		c.InitializeLocalCache()

		iterKey := utils.ConntrackKey{}
		iterNextKey := utils.ConntrackKey{}
		err = goebpfmaps.GetFirstMapEntryByID(uintptr(unsafe.Pointer(&iterKey)), mapID)
		if err != nil {
			return
		} else {
			for {
				iterValue := utils.ConntrackVal{}
				err = goebpfmaps.GetMapEntryByID(uintptr(unsafe.Pointer(&iterKey)), uintptr(unsafe.Pointer(&iterValue)), mapID)
				if err != nil {
					if errors.Is(err, unix.ENOENT) {
						err = nil
						break
					}
					return
				} else {

					newKey := utils.ConntrackKey{}
					newKey.Source_ip = iterKey.Source_ip
					newKey.Source_port = iterKey.Source_port
					newKey.Dest_ip = iterKey.Dest_ip
					newKey.Dest_port = iterKey.Dest_port
					newKey.Protocol = iterKey.Protocol
					newKey.Owner_ip = iterKey.Owner_ip
					c.localConntrackV4Cache[newKey] = true
				}
				err = goebpfmaps.GetNextMapEntryByID(uintptr(unsafe.Pointer(&iterKey)), uintptr(unsafe.Pointer(&iterNextKey)), mapID)
				if errors.Is(err, unix.ENOENT) {
					err = nil
					break
				}
				if err != nil {
					break
				}

				iterKey = iterNextKey
			}
		}
		c.logger.Info("hydrated local conntrack cache")
		c.hydratelocalConntrack = false
	} else {
		// Conntrack table is already hydrated from previous run
		// So read from kernel conntrack table
		conntrackFlows, err := netlink.ConntrackTableList(netlink.ConntrackTable, unix.AF_INET)
		if err != nil {
			c.logger.Info("Failed to read from conntrack table")
			return
		}
		kernelConntrackV4Cache := make(map[utils.ConntrackKey]bool)
		// Build kernel conntrack cache
		for _, conntrackFlow := range conntrackFlows {
			//Check fwd flow with SIP as owner
			fwdFlowWithSIP := utils.ConntrackKey{}
			fwdFlowWithSIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
			fwdFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
			fwdFlowWithSIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.DstIP)
			fwdFlowWithSIP.Dest_port = conntrackFlow.Forward.DstPort
			fwdFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
			fwdFlowWithSIP.Owner_ip = fwdFlowWithSIP.Source_ip

			kernelConntrackV4Cache[fwdFlowWithSIP] = true

			//Check fwd flow with DIP as owner
			fwdFlowWithDIP := utils.ConntrackKey{}
			fwdFlowWithDIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
			fwdFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
			fwdFlowWithDIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.DstIP)
			fwdFlowWithDIP.Dest_port = conntrackFlow.Forward.DstPort
			fwdFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
			fwdFlowWithDIP.Owner_ip = fwdFlowWithDIP.Dest_ip

			kernelConntrackV4Cache[fwdFlowWithDIP] = true

			//Dest can be VIP and pods can be on same node
			destIP := net.ParseIP(conntrackFlow.Forward.DstIP.String())
			revDestIP := net.ParseIP(conntrackFlow.Reverse.SrcIP.String())

			if !destIP.Equal(revDestIP) {
				//Check fwd flow with SIP as owner
				revFlowWithSIP := utils.ConntrackKey{}
				revFlowWithSIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
				revFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
				revFlowWithSIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Reverse.SrcIP)
				revFlowWithSIP.Dest_port = conntrackFlow.Reverse.SrcPort
				revFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
				revFlowWithSIP.Owner_ip = revFlowWithSIP.Source_ip

				kernelConntrackV4Cache[revFlowWithSIP] = true

				//Check fwd flow with DIP as owner
				revFlowWithDIP := utils.ConntrackKey{}
				revFlowWithDIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
				revFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
				revFlowWithDIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Reverse.SrcIP)
				revFlowWithDIP.Dest_port = conntrackFlow.Reverse.SrcPort
				revFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
				revFlowWithDIP.Owner_ip = revFlowWithDIP.Dest_ip

				kernelConntrackV4Cache[revFlowWithDIP] = true
			}
		}
		// Check if the local cache and kernel cache is in sync
		for localConntrackEntry, _ := range c.localConntrackV4Cache {
			newKey := utils.ConntrackKey{}
			newKey.Source_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Source_ip))
			newKey.Source_port = localConntrackEntry.Source_port
			newKey.Dest_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Dest_ip))
			newKey.Dest_port = localConntrackEntry.Dest_port
			newKey.Protocol = localConntrackEntry.Protocol
			newKey.Owner_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Owner_ip))
			_, ok := kernelConntrackV4Cache[newKey]
			if !ok {
				// Delete the entry in local cache since kernel entry is still missing so expired case
				expiredFlow := localConntrackEntry
				key := fmt.Sprintf("Conntrack Key : Source IP - %s Source port - %d Dest IP - %s Dest port - %d Protocol - %d Owner IP - %s", utils.ConvIntToIPv4(expiredFlow.Source_ip).String(), expiredFlow.Source_port, utils.ConvIntToIPv4(expiredFlow.Dest_ip).String(), expiredFlow.Dest_port, expiredFlow.Protocol, utils.ConvIntToIPv4(expiredFlow.Owner_ip).String())
				c.logger.Info("Conntrack cleanup", "Delete - ", key)
				c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&expiredFlow)))

			}
		}
		//c.localConntrackV4Cache = make(map[utils.ConntrackKey]bool)
		c.logger.Info("Done cleanup of conntrack map")
		c.hydratelocalConntrack = true
	}
	return
}