func()

in pkg/accesslog/common/connection.go [157:205]


func (c *ConnectionManager) Start(ctx context.Context, accessLogContext *AccessLogContext) {
	c.processOP.AddListener(c)

	// starting to clean up the un-active connection in BPF
	go func() {
		ticker := time.NewTicker(cleanActiveConnectionInterval)
		for {
			select {
			case <-ticker.C:
				activeConnections := c.activeConnectionMap.Iterate()
				var conID uint64
				var activateConn ActiveConnection
				for activeConnections.Next(&conID, &activateConn) {
					// if the connection is existed, then check the next one
					pid, fd := events.ParseConnectionID(conID)
					if c.checkProcessFDExist(pid, fd) {
						continue
					}

					// if the connection is not existed, then delete it
					if err := c.activeConnectionMap.Delete(conID); err != nil {
						if !errors.Is(err, ebpf.ErrKeyNotExist) {
							log.Warnf("failed to delete the active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error: %v",
								pid, fd, conID, activateConn.RandomID, err)
						}
						continue
					}
					log.Debugf("deleted the active connection as not exist in file system, pid: %d, fd: %d, connection ID: %d, random ID: %d",
						pid, fd, conID, activateConn.RandomID)

					// building and send the close event
					wapperedEvent := c.OnConnectionClose(&events.SocketCloseEvent{
						ConnectionID: conID,
						RandomID:     activateConn.RandomID,
						StartTime:    0,
						EndTime:      0,
						PID:          activateConn.PID,
						SocketFD:     activateConn.SocketFD,
						Success:      0,
					})
					accessLogContext.Queue.AppendKernelLog(NewKernelLogEvent(LogTypeClose, wapperedEvent))
				}

			case <-ctx.Done():
				return
			}
		}
	}()
}