func()

in pcap-cli/internal/transformer/flow_mutex.go [168:200]


func (fm *flowMutex) startReaper(ctx context.Context) {
	// reaping is necessary as packets translations order is not guaranteed:
	// so if all `FIN+ACK`/`RST+*` are seen before other non-termination combinations within the same flow:
	//   - a new carrier will be created to hold its flow lock, and this new carrier will not be organically reaped.
	// additionally: for connection pooling, long running not-used connections should be dropped to reclaim memory.
	ticker := time.NewTicker(carrierDeadline)

	for {
		select {
		case <-ctx.Done():
			ticker.Stop()
			return
		case <-ticker.C:
			fm.MutexMap.ForEach(
				func(flowID uint64, carrier *flowLockCarrier) bool {
					if carrier == nil ||
						carrier.lastUnlockedAt == nil ||
						!carrier.mu.TryLock() {
						return true
					}
					defer carrier.mu.Unlock()
					lastUnlocked := time.Since(*carrier.lastUnlockedAt)
					if lastUnlocked >= carrierDeadline {
						fm.untrackConnection(ctx, &flowID, carrier)
						fm.MutexMap.Del(flowID)
						io.WriteString(os.Stderr,
							sf.Format("reaped flow '{0}' after {1}\n", flowID, lastUnlocked.String()))
					}
					return true
				})
		}
	}
}