func()

in pcap-cli/internal/transformer/flow_mutex.go [351:405]


func (fm *flowMutex) untrackConnection(
	_ context.Context,
	flowID *uint64,
	lock *flowLockCarrier,
) {
	defer func() {
		if r := recover(); r != nil && fm.Debug {
			transformerLogger.Println("panic@untrackConnection: ", r)
		}
	}()

	if ftsm, ok := fm.flowToStreamToSequenceMap.Get(*flowID); ok {
		streams := make([]uint32, ftsm.Len())
		streamIndex := 0

		ftsm.ForEach(func(stream uint32, sttsm STTFM) bool {
			streams[streamIndex] = stream
			sequences := make([]uint32, sttsm.Len())
			sequenceIndex := 0

			sttsm.Range(func(sequence uint32, tf *TracedFlow) bool {
				sequences[sequenceIndex] = sequence
				if tf.isActive.CompareAndSwap(true, false) {
					tf.unblocker.Stop()
				}

				// remove orphaned `traceID`s:
				fm.traceToHttpRequestMap.Del(*tf.ts.traceID)

				sequenceIndex += 1
				return true
			})

			for i := sequenceIndex - 1; i >= 0; i-- {
				sttsm.Delete(sequences[i])
			}

			streamIndex += 1
			return true
		})

		for i := streamIndex - 1; i >= 0; i-- {
			ftsm.Del(streams[i])
		}

		fm.flowToStreamToSequenceMap.Del(*flowID)
	}

	for lock.activeRequests.Load() > 0 {
		lock.activeRequests.Add(-1)
		lock.wg.Done()
	}

	fm.MutexMap.Del(*flowID)
}