func()

in pcap-cli/internal/transformer/flow_mutex.go [280:349]


func (fm *flowMutex) trackConnection(
	ctx context.Context,
	lock *flowLockCarrier,
	serial *uint64,
	flowID *uint64,
	tcpFlags *uint8,
	seq, ack *uint32,
	local bool,
	ts *traceAndSpan,
) (*TracedFlow, bool) {
	if ts == nil {
		return nil, false
	}
	var isActive atomic.Bool

	tf := &TracedFlow{
		lock:     lock,
		serial:   serial,
		flowID:   flowID,
		ts:       ts,
		isActive: &isActive,
	}

	isActive.Store(true)

	tf.unblocker = time.AfterFunc(trackingDeadline, func() {
		// allow termination events to continue
		if !isActive.CompareAndSwap(true, false) {
			return
		}

		lock.mu.Lock()
		defer lock.mu.Unlock()

		tsBeforeUnblocling := time.Now()
		msgBeforeUnblocking := sf.Format("unblocking/{0}", *ts.traceID)
		go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsBeforeUnblocling, &msgBeforeUnblocking)

		if lock.activeRequests.Add(-1) >= 0 {
			lock.wg.Done()
			tsAfterUnblocking := time.Now()
			msgAfterUnblocking := sf.Format("unblocked/{0}", *ts.traceID)
			go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsAfterUnblocking, &msgAfterUnblocking)
		}
	})

	if streamToSequenceMap, loaded := fm.flowToStreamToSequenceMap.
		GetOrCompute(*flowID, func() STSM {
			streamToSequenceMap := haxmap.New[uint32, STTFM]()
			fm.sequenceToTracedFlowMapProvider(ctx,
				serial, flowID, tcpFlags, local, ts.streamID, seq, ack, tf, streamToSequenceMap)
			return streamToSequenceMap
		}); loaded {
		// if the `Flow-to-Sequence-MAP` already contains an entry for this `FlowID`:
		//   - the `Stream-to-[Sequence-to-TracedFlow]-MAP` pointed by the `FlowID`:
		//     - does not contain this `StreamID` pointing to its own `Sequence-to-TracedFlow-MAP`.
		if _, loaded := streamToSequenceMap.GetOrCompute(*ts.streamID, func() STTFM {
			return fm.sequenceToTracedFlowMapProvider(ctx,
				serial, flowID, tcpFlags, local, ts.streamID, seq, ack, tf, streamToSequenceMap)
		}); loaded {
			// if the `Stream-to-[Sequence-to-TracedFlow]-MAP` already contains an entry for this `StreamID`:
			//   - the `Sequence-to-TracedFlow-MAP` pointed by the `StreamID`:
			//     - does not contain this `sequence` pointing to its own `TracedFlow`.
			fm.sequenceToTracedFlowMapProvider(ctx,
				serial, flowID, tcpFlags, local, ts.streamID, seq, ack, tf, streamToSequenceMap)
		}
	}

	return tf, true
}