in pkg/accesslog/common/connection.go [157:205]
func (c *ConnectionManager) Start(ctx context.Context, accessLogContext *AccessLogContext) {
c.processOP.AddListener(c)
// starting to clean up the un-active connection in BPF
go func() {
ticker := time.NewTicker(cleanActiveConnectionInterval)
for {
select {
case <-ticker.C:
activeConnections := c.activeConnectionMap.Iterate()
var conID uint64
var activateConn ActiveConnection
for activeConnections.Next(&conID, &activateConn) {
// if the connection is existed, then check the next one
pid, fd := events.ParseConnectionID(conID)
if c.checkProcessFDExist(pid, fd) {
continue
}
// if the connection is not existed, then delete it
if err := c.activeConnectionMap.Delete(conID); err != nil {
if !errors.Is(err, ebpf.ErrKeyNotExist) {
log.Warnf("failed to delete the active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error: %v",
pid, fd, conID, activateConn.RandomID, err)
}
continue
}
log.Debugf("deleted the active connection as not exist in file system, pid: %d, fd: %d, connection ID: %d, random ID: %d",
pid, fd, conID, activateConn.RandomID)
// building and send the close event
wapperedEvent := c.OnConnectionClose(&events.SocketCloseEvent{
ConnectionID: conID,
RandomID: activateConn.RandomID,
StartTime: 0,
EndTime: 0,
PID: activateConn.PID,
SocketFD: activateConn.SocketFD,
Success: 0,
})
accessLogContext.Queue.AppendKernelLog(NewKernelLogEvent(LogTypeClose, wapperedEvent))
}
case <-ctx.Done():
return
}
}
}()
}