func()

in pcap-cli/internal/transformer/flow_mutex.go [428:667]


func (fm *flowMutex) lock(
	ctx context.Context,
	serial *uint64,
	flowID *uint64,
	tcpFlags *uint8,
	seq, ack *uint32,
	local bool,
) (
	*flowLock,
	TraceAndSpanProvider,
) {
	carrier, _ := fm.MutexMap.
		GetOrCompute(*flowID,
			func() *flowLockCarrier {
				return fm.newFlowLockCarrier(serial, flowID)
			})

	mu := carrier.mu
	wg := carrier.wg

	// changing the order of `Wait` and `Lock` causes a deadlock
	if isConnectionTermination(tcpFlags) {
		tsBeforeWaiting := time.Now()
		msgBeforeWaiting := "waiting"
		go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsBeforeWaiting, &msgBeforeWaiting)
		// Connection termination events must wait for the flow to stop being trace-tracked.
		// some important considerations:
		//   - If this flow is not trace-tracked ( meaning this termination event acquires the lock ahead of any other TCP segment carrying an HTTP message with trace information ) `Wait()` won't block:
		//       - this could happen because not only packet processing but also layers processing are done concurrently in order to complete packet translations as fast as possible.
		//           - the side effect of this high level of concurrency is that order of execution is not guaranteed and so trace tracking is currently done as best-effort.
		//           - in practice, the common scenario is for connection termination events to arrive at `lock` after TCP segments container tracing information.
		//       - this is currently by design: doing it differently would require to store TCP events in memory which is not great to keep memory footprint small.
		//           - currently the only state stored in memory is: a Map from TCP flow to HTTP stream to TCP sequence that points to its corresponding trace information.
		//   - This is `true` also for TCP segments carrying HTTP responses if those acquire the lock before the ones carrying HTTP requests, but these won't clear/release trace-tracking information.
		// How to do it differently?: we'd need to store semaphores in a table indexable by `FlowID` and have termination events wait on them until the TCP segments carrying tracing information are seen.
		//   - this, however, is a wild assumption, as TCP segments carrying tracing information might never arrive and so the termination events will be locked for no reason.
		//     - if this a price we'd like to pay, then this scenario could be handled by having a watchdog running periodically to unblock termination events after a deadline, but this approach feels clumsy at the moment.
		//     - why we would not want to do this?: waiting on some non-deterministical event to happen means that a go-routine from the packet processing pool will be hijacked maybe for no good reason.

		// if execution is done:
		//   - do not throttle termination packets processing
		//   - release go routines ASAP to allow termination flow to continue
		select {
		case <-ctx.Done():
			break
		default:
			wg.Wait()
		}

		tsAfterWaiting := time.Now()
		msgAfterWaiting := "continue"
		go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsAfterWaiting, &msgAfterWaiting)
	}
	// it is possible that all packets for this flow arrive to `Lock` at almost the same time:
	//   - which means that termination could delete the reference to `FlowLock` from `MutexMap` while non terminating ones are waiting for the lock
	mu.Lock()

	lockAcquiredTS := time.Now()
	carrier.lastLockedAt = &lockAcquiredTS

	tracedFlowProvider, _ := fm.getTracedFlow(flowID, seq, ack, local)

	_unlock := func() {
		defer func(mu *sync.Mutex) {
			if err := recover(); err != nil {
				io.WriteString(os.Stderr,
					fmt.Sprintf("error at flow[%d]: %+v | %+v\n", *flowID, err, mu))
			}
		}(mu)
		defer mu.Unlock()
		lastUnlockedTS := time.Now()
		carrier.lastUnlockedAt = &lastUnlockedTS
	}

	UnlockAndReleaseFN := func(ctx context.Context) (bool, *time.Duration) {
		defer _unlock()
		// many translations within the same flow may be waiting to acquire the lock;
		// if multiple translations try to release, i/e: 2*`FIN+ACK`,
		// then both will release the lock, but just 1 must yield connection untracking.
		if carrier.activeRequests.Load() == 0 &&
			carrier.released.CompareAndSwap(false, true) {
			// termination packets will clean the tracing info available for each flow:
			//   - give some margin for all other packets to access flow state, and then flush it.
			select {
			case <-ctx.Done():
				// untrack connection immediately if the context is done
				fm.untrackConnection(ctx, flowID, carrier)
			default:
				time.AfterFunc(trackingDeadline, func() {
					timestamp := time.Now()
					message := "untracking"
					go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &timestamp, &message)
					fm.untrackConnection(ctx, flowID, carrier)
				})
			}
			lockLatency := time.Since(lockAcquiredTS)
			return true, &lockLatency
		}
		lockLatency := time.Since(lockAcquiredTS)
		return false, &lockLatency
	}

	UnlockWithTCPFlagsFN := func(
		ctx context.Context,
		tcpFlags *uint8,
	) (bool, *time.Duration) {
		if isConnectionTermination(tcpFlags) {
			return UnlockAndReleaseFN(ctx)
		}
		defer _unlock()
		lockLatency := time.Since(lockAcquiredTS)
		return false, &lockLatency
	}

	// this is just an alias of `UnlockWithTCPFlagsFN`,
	//   - but it uses the same `tcpFlags` used to acquire the `lock`
	UnlockFn := func(ctx context.Context) (bool, *time.Duration) {
		return UnlockWithTCPFlagsFN(ctx, tcpFlags)
	}

	IsHTTP2FN := func() bool { return carrier.isHTTP2 }

	// since all TCP data is known:
	//   - it is possible to return a `traceID`
	//   - since this is guarded by a lock, it is thread-safe
	// much richer analysis is also possible

	// these are the only methods for consumers to interact with the lock
	lock := &flowLock{
		IsHTTP2:            IsHTTP2FN,
		Unlock:             UnlockFn,
		UnlockAndRelease:   UnlockAndReleaseFN,
		UnlockWithTCPFlags: UnlockWithTCPFlagsFN,
	}

	if *tcpFlags&(tcpSyn|tcpFin|tcpRst) == 0 {
		// provide trace tracking only for TCP: `PSH+ACK`, and `ACK`.
		// For HTTP/2 multiple streams are delivered over the same TCP connection, so:
		//   - it is possible to observe multiple requests and responses in the same TCP segment,
		//   - `unlock` must handle the scenario where the same TCP segment contains both HTTP requests/responses.
		// It is possible to receive HTTP requests/responses without `traceID`, so:
		//   - both must be accounted to accurately calculate the total number of `activeRequests`:
		//     - this is regardless of `traceID` being available; otherwise, termination packets are blocked:
		//       - this will not trigger a deadlock as only the termination is being delayed,
		//       - the `unblocker`s will eventually allow termination packets to make progress.
		// The following flow-unlocking mechanism tries to:
		//   - prevent trace-tracking information removal by connection termination packets
		//   - store trace-tracking information for HTTP requests/responses:
		//     - that will be used to link HTTP requests to HTTP responses.
		lock.UnlockWithTraceAndSpan = func(
			ctx context.Context,
			tcpFlags *uint8,
			isHTTP2 bool,
			requestStreams []uint32,
			responseStreams []uint32,
			requestTS map[uint32]*traceAndSpan,
			responseTS map[uint32]*traceAndSpan,
		) (*int64, *time.Duration) {
			carrier.isHTTP2 = isHTTP2
			activeRequests := carrier.activeRequests.Load()

			sizeOfRequestStreams := int64(len(requestStreams))
			sizeOfRequestTraceAndSpans := len(requestTS)
			// handle flow `unlock` for requests
			if sizeOfRequestTraceAndSpans > 0 || sizeOfRequestStreams > 0 {
				for _, stream := range requestStreams {
					if ts, tsAvailable := requestTS[stream]; tsAvailable {
						// tracking connections allows for HTTP responses without trace headers
						// to be correlated with the request that brought them to existence.
						if tf, tracked := fm.trackConnection(ctx,
							carrier, serial, flowID, tcpFlags, seq, ack, local, ts); tracked {
							activeRequests = carrier.activeRequests.Add(1)
							if activeRequests > 0 {
								wg.Add(1)

								requestTS := time.Now()
								requestMsg := sf.Format("request/{0}", *ts.traceID)
								go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &requestTS, &requestMsg)
							} else if tf.isActive.CompareAndSwap(true, false) {
								// if more HTTP responses are seen before the currentl HTTP request:
								//   - do not block `FIN+ACK`/`RST` packets from making progress;
								// de-activate the `unblocker` for this `TracedFlow`.
								tf.unblocker.Stop()
							}
						}
					}
				}
			}

			sizeOfResponseStreams := int64(len(responseStreams))
			sizeOfResponseTraceAndSpans := len(responseTS)
			// handle flow `unlock` for responses
			if sizeOfResponseTraceAndSpans > 0 || sizeOfResponseStreams > 0 {
				for _, stream := range responseStreams {
					if ts, tsAvailable := responseTS[stream]; tsAvailable {
						if tf, traceFound := tracedFlowProvider(ts.streamID); traceFound {
							activeRequests = carrier.activeRequests.Add(-1)
							if activeRequests >= 0 &&
								*tf.ts.traceID == *ts.traceID &&
								tf.isActive.CompareAndSwap(true, false) &&
								tf.unblocker.Stop() {
								wg.Done()

								responseTS := time.Now()
								responseMsg := sf.Format("response/{0}", *ts.traceID)
								go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &responseTS, &responseMsg)
							}
						}
					}
				}
			}

			_, lockLatency := UnlockWithTCPFlagsFN(ctx, tcpFlags)
			return &activeRequests, lockLatency
		}
	} else {
		activeRequests := carrier.activeRequests.Load()
		// do not provide trace tracking for non TCP `PSH+ACK`
		lock.UnlockWithTraceAndSpan = func(
			ctx context.Context,
			tcpFlags *uint8, /* tcpFlags */
			_ bool, /* isHTTP2 */
			_ []uint32, /* requestStreams */
			_ []uint32, /* responseStreams */
			_ map[uint32]*traceAndSpan, /* requestTS */
			_ map[uint32]*traceAndSpan, /* responseTS */
		) (*int64, *time.Duration) {
			// fallback to unlock by TCP flags
			_, lockLatency := UnlockWithTCPFlagsFN(ctx, tcpFlags)
			return &activeRequests, lockLatency
		}
	}

	return lock, func(streamID *uint32) (*traceAndSpan, bool) {
		if tf, ok := tracedFlowProvider(streamID); ok {
			return tf.ts, ok
		}
		return nil, false
	}
}