in pkg/ebpf/conntrack/conntrack_client.go [195:345]
func (c *conntrackClient) Cleanupv6ConntrackMap() {
c.logger.Info("Check for any stale entries in the conntrack map")
bpfMapApi := &goebpfmaps.BpfMap{}
mapInfo, err := bpfMapApi.GetMapFromPinPath(CONNTRACK_MAP_PIN_PATH)
if err != nil {
c.logger.Info("Failed to get mapInfo for conntrack pinpath")
return
}
mapID := int(mapInfo.Id)
// Read from eBPF Table if local conntrack table is not cached
if c.hydratelocalConntrack {
//Lets cleanup all entries in cache
c.InitializeLocalCache()
iterKey := utils.ConntrackKeyV6{}
iterNextKey := utils.ConntrackKeyV6{}
byteSlice := utils.ConvConntrackV6ToByte(iterKey)
nextbyteSlice := utils.ConvConntrackV6ToByte(iterNextKey)
err = goebpfmaps.GetFirstMapEntryByID(uintptr(unsafe.Pointer(&byteSlice[0])), mapID)
if err != nil {
return
} else {
for {
iterValue := utils.ConntrackVal{}
err = goebpfmaps.GetMapEntryByID(uintptr(unsafe.Pointer(&byteSlice[0])), uintptr(unsafe.Pointer(&iterValue)), mapID)
if err != nil {
if errors.Is(err, unix.ENOENT) {
err = nil
break
}
return
} else {
newKey := utils.ConntrackKeyV6{}
connKey := utils.ConvByteToConntrackV6(byteSlice)
utils.CopyV6Bytes(&newKey.Source_ip, connKey.Source_ip)
utils.CopyV6Bytes(&newKey.Dest_ip, connKey.Dest_ip)
newKey.Source_port = connKey.Source_port
newKey.Dest_port = connKey.Dest_port
newKey.Protocol = connKey.Protocol
utils.CopyV6Bytes(&newKey.Owner_ip, connKey.Owner_ip)
c.localConntrackV6Cache[newKey] = true
}
err = goebpfmaps.GetNextMapEntryByID(uintptr(unsafe.Pointer(&byteSlice[0])), uintptr(unsafe.Pointer(&nextbyteSlice[0])), mapID)
if errors.Is(err, unix.ENOENT) {
err = nil
break
}
if err != nil {
break
}
copy(byteSlice, nextbyteSlice)
}
}
c.logger.Info("hydrated local conntrack cache")
c.hydratelocalConntrack = false
} else {
// Conntrack table is already hydrated from previous run
// So read from kernel conntrack table
conntrackFlows, err := netlink.ConntrackTableList(netlink.ConntrackTable, unix.AF_INET6)
if err != nil {
c.logger.Info("Failed to read from conntrack table")
return
}
kernelConntrackV6Cache := make(map[utils.ConntrackKeyV6]bool)
// Build local conntrack cache
for _, conntrackFlow := range conntrackFlows {
//Check fwd flow with SIP as owner
fwdFlowWithSIP := utils.ConntrackKeyV6{}
sip := utils.ConvIPv6ToByte(conntrackFlow.Forward.SrcIP)
copy(fwdFlowWithSIP.Source_ip[:], sip)
fwdFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
dip := utils.ConvIPv6ToByte(conntrackFlow.Forward.DstIP)
copy(fwdFlowWithSIP.Dest_ip[:], dip)
fwdFlowWithSIP.Dest_port = conntrackFlow.Forward.DstPort
fwdFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
copy(fwdFlowWithSIP.Owner_ip[:], sip)
kernelConntrackV6Cache[fwdFlowWithSIP] = true
//Check fwd flow with DIP as owner
fwdFlowWithDIP := utils.ConntrackKeyV6{}
sip = utils.ConvIPv6ToByte(conntrackFlow.Forward.SrcIP)
copy(fwdFlowWithDIP.Source_ip[:], sip)
fwdFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
dip = utils.ConvIPv6ToByte(conntrackFlow.Forward.DstIP)
copy(fwdFlowWithDIP.Dest_ip[:], dip)
fwdFlowWithDIP.Dest_port = conntrackFlow.Forward.DstPort
fwdFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
copy(fwdFlowWithDIP.Owner_ip[:], dip)
kernelConntrackV6Cache[fwdFlowWithDIP] = true
//Dest can be VIP and pods can be on same node
destIP := net.ParseIP(conntrackFlow.Forward.DstIP.String())
revDestIP := net.ParseIP(conntrackFlow.Reverse.SrcIP.String())
if !destIP.Equal(revDestIP) {
//Check fwd flow with SIP as owner
revFlowWithSIP := utils.ConntrackKeyV6{}
sip = utils.ConvIPv6ToByte(conntrackFlow.Forward.SrcIP)
copy(revFlowWithSIP.Source_ip[:], sip)
revFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
dip = utils.ConvIPv6ToByte(conntrackFlow.Reverse.SrcIP)
copy(revFlowWithSIP.Dest_ip[:], dip)
revFlowWithSIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
copy(revFlowWithSIP.Owner_ip[:], sip)
kernelConntrackV6Cache[revFlowWithSIP] = true
//Check fwd flow with DIP as owner
revFlowWithDIP := utils.ConntrackKeyV6{}
sip = utils.ConvIPv6ToByte(conntrackFlow.Forward.SrcIP)
copy(revFlowWithDIP.Source_ip[:], sip)
revFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
dip = utils.ConvIPv6ToByte(conntrackFlow.Reverse.SrcIP)
copy(revFlowWithDIP.Dest_ip[:], dip)
revFlowWithDIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
copy(revFlowWithDIP.Owner_ip[:], dip)
kernelConntrackV6Cache[revFlowWithDIP] = true
}
}
// Check if the local cache and kernel cache is in sync
for localConntrackEntry, _ := range c.localConntrackV6Cache {
_, ok := kernelConntrackV6Cache[localConntrackEntry]
if !ok {
// Delete the entry in local cache since kernel entry is still missing so expired case
expiredFlow := localConntrackEntry
key := fmt.Sprintf("Conntrack Key : Source IP - %s Source port - %d Dest IP - %s Dest port - %d Protocol - %d Owner IP - %s", utils.ConvByteToIPv6(expiredFlow.Source_ip).String(), expiredFlow.Source_port, utils.ConvByteToIPv6(expiredFlow.Dest_ip).String(), expiredFlow.Dest_port, expiredFlow.Protocol, utils.ConvByteToIPv6(expiredFlow.Owner_ip).String())
c.logger.Info("Conntrack cleanup", "Delete - ", key)
ceByteSlice := utils.ConvConntrackV6ToByte(expiredFlow)
c.printByteArray(ceByteSlice)
c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&ceByteSlice[0])))
}
}
//Lets cleanup all entries in cache
c.localConntrackV6Cache = make(map[utils.ConntrackKeyV6]bool)
c.logger.Info("Done cleanup of conntrack map")
c.hydratelocalConntrack = true
}
return
}