in pkg/ebpf/conntrack/conntrack_client.go [53:193]
func (c *conntrackClient) CleanupConntrackMap() {
c.logger.Info("Check for any stale entries in the conntrack map")
bpfMapApi := &goebpfmaps.BpfMap{}
mapInfo, err := bpfMapApi.GetMapFromPinPath(CONNTRACK_MAP_PIN_PATH)
if err != nil {
c.logger.Info("Failed to get mapInfo for conntrack pinpath")
return
}
mapID := int(mapInfo.Id)
// Read from eBPF Table if local conntrack table is not cached
if c.hydratelocalConntrack {
//Lets cleanup all entries in cache
c.InitializeLocalCache()
iterKey := utils.ConntrackKey{}
iterNextKey := utils.ConntrackKey{}
err = goebpfmaps.GetFirstMapEntryByID(uintptr(unsafe.Pointer(&iterKey)), mapID)
if err != nil {
return
} else {
for {
iterValue := utils.ConntrackVal{}
err = goebpfmaps.GetMapEntryByID(uintptr(unsafe.Pointer(&iterKey)), uintptr(unsafe.Pointer(&iterValue)), mapID)
if err != nil {
if errors.Is(err, unix.ENOENT) {
err = nil
break
}
return
} else {
newKey := utils.ConntrackKey{}
newKey.Source_ip = iterKey.Source_ip
newKey.Source_port = iterKey.Source_port
newKey.Dest_ip = iterKey.Dest_ip
newKey.Dest_port = iterKey.Dest_port
newKey.Protocol = iterKey.Protocol
newKey.Owner_ip = iterKey.Owner_ip
c.localConntrackV4Cache[newKey] = true
}
err = goebpfmaps.GetNextMapEntryByID(uintptr(unsafe.Pointer(&iterKey)), uintptr(unsafe.Pointer(&iterNextKey)), mapID)
if errors.Is(err, unix.ENOENT) {
err = nil
break
}
if err != nil {
break
}
iterKey = iterNextKey
}
}
c.logger.Info("hydrated local conntrack cache")
c.hydratelocalConntrack = false
} else {
// Conntrack table is already hydrated from previous run
// So read from kernel conntrack table
conntrackFlows, err := netlink.ConntrackTableList(netlink.ConntrackTable, unix.AF_INET)
if err != nil {
c.logger.Info("Failed to read from conntrack table")
return
}
kernelConntrackV4Cache := make(map[utils.ConntrackKey]bool)
// Build kernel conntrack cache
for _, conntrackFlow := range conntrackFlows {
//Check fwd flow with SIP as owner
fwdFlowWithSIP := utils.ConntrackKey{}
fwdFlowWithSIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
fwdFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
fwdFlowWithSIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.DstIP)
fwdFlowWithSIP.Dest_port = conntrackFlow.Forward.DstPort
fwdFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
fwdFlowWithSIP.Owner_ip = fwdFlowWithSIP.Source_ip
kernelConntrackV4Cache[fwdFlowWithSIP] = true
//Check fwd flow with DIP as owner
fwdFlowWithDIP := utils.ConntrackKey{}
fwdFlowWithDIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
fwdFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
fwdFlowWithDIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.DstIP)
fwdFlowWithDIP.Dest_port = conntrackFlow.Forward.DstPort
fwdFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
fwdFlowWithDIP.Owner_ip = fwdFlowWithDIP.Dest_ip
kernelConntrackV4Cache[fwdFlowWithDIP] = true
//Dest can be VIP and pods can be on same node
destIP := net.ParseIP(conntrackFlow.Forward.DstIP.String())
revDestIP := net.ParseIP(conntrackFlow.Reverse.SrcIP.String())
if !destIP.Equal(revDestIP) {
//Check fwd flow with SIP as owner
revFlowWithSIP := utils.ConntrackKey{}
revFlowWithSIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
revFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
revFlowWithSIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Reverse.SrcIP)
revFlowWithSIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
revFlowWithSIP.Owner_ip = revFlowWithSIP.Source_ip
kernelConntrackV4Cache[revFlowWithSIP] = true
//Check fwd flow with DIP as owner
revFlowWithDIP := utils.ConntrackKey{}
revFlowWithDIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
revFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
revFlowWithDIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Reverse.SrcIP)
revFlowWithDIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
revFlowWithDIP.Owner_ip = revFlowWithDIP.Dest_ip
kernelConntrackV4Cache[revFlowWithDIP] = true
}
}
// Check if the local cache and kernel cache is in sync
for localConntrackEntry, _ := range c.localConntrackV4Cache {
newKey := utils.ConntrackKey{}
newKey.Source_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Source_ip))
newKey.Source_port = localConntrackEntry.Source_port
newKey.Dest_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Dest_ip))
newKey.Dest_port = localConntrackEntry.Dest_port
newKey.Protocol = localConntrackEntry.Protocol
newKey.Owner_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Owner_ip))
_, ok := kernelConntrackV4Cache[newKey]
if !ok {
// Delete the entry in local cache since kernel entry is still missing so expired case
expiredFlow := localConntrackEntry
key := fmt.Sprintf("Conntrack Key : Source IP - %s Source port - %d Dest IP - %s Dest port - %d Protocol - %d Owner IP - %s", utils.ConvIntToIPv4(expiredFlow.Source_ip).String(), expiredFlow.Source_port, utils.ConvIntToIPv4(expiredFlow.Dest_ip).String(), expiredFlow.Dest_port, expiredFlow.Protocol, utils.ConvIntToIPv4(expiredFlow.Owner_ip).String())
c.logger.Info("Conntrack cleanup", "Delete - ", key)
c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&expiredFlow)))
}
}
//c.localConntrackV4Cache = make(map[utils.ConntrackKey]bool)
c.logger.Info("Done cleanup of conntrack map")
c.hydratelocalConntrack = true
}
return
}