in pcap-cli/internal/transformer/flow_mutex.go [280:349]
func (fm *flowMutex) trackConnection(
ctx context.Context,
lock *flowLockCarrier,
serial *uint64,
flowID *uint64,
tcpFlags *uint8,
seq, ack *uint32,
local bool,
ts *traceAndSpan,
) (*TracedFlow, bool) {
if ts == nil {
return nil, false
}
var isActive atomic.Bool
tf := &TracedFlow{
lock: lock,
serial: serial,
flowID: flowID,
ts: ts,
isActive: &isActive,
}
isActive.Store(true)
tf.unblocker = time.AfterFunc(trackingDeadline, func() {
// allow termination events to continue
if !isActive.CompareAndSwap(true, false) {
return
}
lock.mu.Lock()
defer lock.mu.Unlock()
tsBeforeUnblocling := time.Now()
msgBeforeUnblocking := sf.Format("unblocking/{0}", *ts.traceID)
go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsBeforeUnblocling, &msgBeforeUnblocking)
if lock.activeRequests.Add(-1) >= 0 {
lock.wg.Done()
tsAfterUnblocking := time.Now()
msgAfterUnblocking := sf.Format("unblocked/{0}", *ts.traceID)
go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsAfterUnblocking, &msgAfterUnblocking)
}
})
if streamToSequenceMap, loaded := fm.flowToStreamToSequenceMap.
GetOrCompute(*flowID, func() STSM {
streamToSequenceMap := haxmap.New[uint32, STTFM]()
fm.sequenceToTracedFlowMapProvider(ctx,
serial, flowID, tcpFlags, local, ts.streamID, seq, ack, tf, streamToSequenceMap)
return streamToSequenceMap
}); loaded {
// if the `Flow-to-Sequence-MAP` already contains an entry for this `FlowID`:
// - the `Stream-to-[Sequence-to-TracedFlow]-MAP` pointed by the `FlowID`:
// - does not contain this `StreamID` pointing to its own `Sequence-to-TracedFlow-MAP`.
if _, loaded := streamToSequenceMap.GetOrCompute(*ts.streamID, func() STTFM {
return fm.sequenceToTracedFlowMapProvider(ctx,
serial, flowID, tcpFlags, local, ts.streamID, seq, ack, tf, streamToSequenceMap)
}); loaded {
// if the `Stream-to-[Sequence-to-TracedFlow]-MAP` already contains an entry for this `StreamID`:
// - the `Sequence-to-TracedFlow-MAP` pointed by the `StreamID`:
// - does not contain this `sequence` pointing to its own `TracedFlow`.
fm.sequenceToTracedFlowMapProvider(ctx,
serial, flowID, tcpFlags, local, ts.streamID, seq, ack, tf, streamToSequenceMap)
}
}
return tf, true
}