in pcap-cli/internal/transformer/flow_mutex.go [168:200]
func (fm *flowMutex) startReaper(ctx context.Context) {
// reaping is necessary as packets translations order is not guaranteed:
// so if all `FIN+ACK`/`RST+*` are seen before other non-termination combinations within the same flow:
// - a new carrier will be created to hold its flow lock, and this new carrier will not be organically reaped.
// additionally: for connection pooling, long running not-used connections should be dropped to reclaim memory.
ticker := time.NewTicker(carrierDeadline)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
fm.MutexMap.ForEach(
func(flowID uint64, carrier *flowLockCarrier) bool {
if carrier == nil ||
carrier.lastUnlockedAt == nil ||
!carrier.mu.TryLock() {
return true
}
defer carrier.mu.Unlock()
lastUnlocked := time.Since(*carrier.lastUnlockedAt)
if lastUnlocked >= carrierDeadline {
fm.untrackConnection(ctx, &flowID, carrier)
fm.MutexMap.Del(flowID)
io.WriteString(os.Stderr,
sf.Format("reaped flow '{0}' after {1}\n", flowID, lastUnlocked.String()))
}
return true
})
}
}
}