in pcap-cli/internal/transformer/flow_mutex.go [428:667]
func (fm *flowMutex) lock(
ctx context.Context,
serial *uint64,
flowID *uint64,
tcpFlags *uint8,
seq, ack *uint32,
local bool,
) (
*flowLock,
TraceAndSpanProvider,
) {
carrier, _ := fm.MutexMap.
GetOrCompute(*flowID,
func() *flowLockCarrier {
return fm.newFlowLockCarrier(serial, flowID)
})
mu := carrier.mu
wg := carrier.wg
// changing the order of `Wait` and `Lock` causes a deadlock
if isConnectionTermination(tcpFlags) {
tsBeforeWaiting := time.Now()
msgBeforeWaiting := "waiting"
go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsBeforeWaiting, &msgBeforeWaiting)
// Connection termination events must wait for the flow to stop being trace-tracked.
// some important considerations:
// - If this flow is not trace-tracked ( meaning this termination event acquires the lock ahead of any other TCP segment carrying an HTTP message with trace information ) `Wait()` won't block:
// - this could happen because not only packet processing but also layers processing are done concurrently in order to complete packet translations as fast as possible.
// - the side effect of this high level of concurrency is that order of execution is not guaranteed and so trace tracking is currently done as best-effort.
// - in practice, the common scenario is for connection termination events to arrive at `lock` after TCP segments container tracing information.
// - this is currently by design: doing it differently would require to store TCP events in memory which is not great to keep memory footprint small.
// - currently the only state stored in memory is: a Map from TCP flow to HTTP stream to TCP sequence that points to its corresponding trace information.
// - This is `true` also for TCP segments carrying HTTP responses if those acquire the lock before the ones carrying HTTP requests, but these won't clear/release trace-tracking information.
// How to do it differently?: we'd need to store semaphores in a table indexable by `FlowID` and have termination events wait on them until the TCP segments carrying tracing information are seen.
// - this, however, is a wild assumption, as TCP segments carrying tracing information might never arrive and so the termination events will be locked for no reason.
// - if this a price we'd like to pay, then this scenario could be handled by having a watchdog running periodically to unblock termination events after a deadline, but this approach feels clumsy at the moment.
// - why we would not want to do this?: waiting on some non-deterministical event to happen means that a go-routine from the packet processing pool will be hijacked maybe for no good reason.
// if execution is done:
// - do not throttle termination packets processing
// - release go routines ASAP to allow termination flow to continue
select {
case <-ctx.Done():
break
default:
wg.Wait()
}
tsAfterWaiting := time.Now()
msgAfterWaiting := "continue"
go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsAfterWaiting, &msgAfterWaiting)
}
// it is possible that all packets for this flow arrive to `Lock` at almost the same time:
// - which means that termination could delete the reference to `FlowLock` from `MutexMap` while non terminating ones are waiting for the lock
mu.Lock()
lockAcquiredTS := time.Now()
carrier.lastLockedAt = &lockAcquiredTS
tracedFlowProvider, _ := fm.getTracedFlow(flowID, seq, ack, local)
_unlock := func() {
defer func(mu *sync.Mutex) {
if err := recover(); err != nil {
io.WriteString(os.Stderr,
fmt.Sprintf("error at flow[%d]: %+v | %+v\n", *flowID, err, mu))
}
}(mu)
defer mu.Unlock()
lastUnlockedTS := time.Now()
carrier.lastUnlockedAt = &lastUnlockedTS
}
UnlockAndReleaseFN := func(ctx context.Context) (bool, *time.Duration) {
defer _unlock()
// many translations within the same flow may be waiting to acquire the lock;
// if multiple translations try to release, i/e: 2*`FIN+ACK`,
// then both will release the lock, but just 1 must yield connection untracking.
if carrier.activeRequests.Load() == 0 &&
carrier.released.CompareAndSwap(false, true) {
// termination packets will clean the tracing info available for each flow:
// - give some margin for all other packets to access flow state, and then flush it.
select {
case <-ctx.Done():
// untrack connection immediately if the context is done
fm.untrackConnection(ctx, flowID, carrier)
default:
time.AfterFunc(trackingDeadline, func() {
timestamp := time.Now()
message := "untracking"
go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, ×tamp, &message)
fm.untrackConnection(ctx, flowID, carrier)
})
}
lockLatency := time.Since(lockAcquiredTS)
return true, &lockLatency
}
lockLatency := time.Since(lockAcquiredTS)
return false, &lockLatency
}
UnlockWithTCPFlagsFN := func(
ctx context.Context,
tcpFlags *uint8,
) (bool, *time.Duration) {
if isConnectionTermination(tcpFlags) {
return UnlockAndReleaseFN(ctx)
}
defer _unlock()
lockLatency := time.Since(lockAcquiredTS)
return false, &lockLatency
}
// this is just an alias of `UnlockWithTCPFlagsFN`,
// - but it uses the same `tcpFlags` used to acquire the `lock`
UnlockFn := func(ctx context.Context) (bool, *time.Duration) {
return UnlockWithTCPFlagsFN(ctx, tcpFlags)
}
IsHTTP2FN := func() bool { return carrier.isHTTP2 }
// since all TCP data is known:
// - it is possible to return a `traceID`
// - since this is guarded by a lock, it is thread-safe
// much richer analysis is also possible
// these are the only methods for consumers to interact with the lock
lock := &flowLock{
IsHTTP2: IsHTTP2FN,
Unlock: UnlockFn,
UnlockAndRelease: UnlockAndReleaseFN,
UnlockWithTCPFlags: UnlockWithTCPFlagsFN,
}
if *tcpFlags&(tcpSyn|tcpFin|tcpRst) == 0 {
// provide trace tracking only for TCP: `PSH+ACK`, and `ACK`.
// For HTTP/2 multiple streams are delivered over the same TCP connection, so:
// - it is possible to observe multiple requests and responses in the same TCP segment,
// - `unlock` must handle the scenario where the same TCP segment contains both HTTP requests/responses.
// It is possible to receive HTTP requests/responses without `traceID`, so:
// - both must be accounted to accurately calculate the total number of `activeRequests`:
// - this is regardless of `traceID` being available; otherwise, termination packets are blocked:
// - this will not trigger a deadlock as only the termination is being delayed,
// - the `unblocker`s will eventually allow termination packets to make progress.
// The following flow-unlocking mechanism tries to:
// - prevent trace-tracking information removal by connection termination packets
// - store trace-tracking information for HTTP requests/responses:
// - that will be used to link HTTP requests to HTTP responses.
lock.UnlockWithTraceAndSpan = func(
ctx context.Context,
tcpFlags *uint8,
isHTTP2 bool,
requestStreams []uint32,
responseStreams []uint32,
requestTS map[uint32]*traceAndSpan,
responseTS map[uint32]*traceAndSpan,
) (*int64, *time.Duration) {
carrier.isHTTP2 = isHTTP2
activeRequests := carrier.activeRequests.Load()
sizeOfRequestStreams := int64(len(requestStreams))
sizeOfRequestTraceAndSpans := len(requestTS)
// handle flow `unlock` for requests
if sizeOfRequestTraceAndSpans > 0 || sizeOfRequestStreams > 0 {
for _, stream := range requestStreams {
if ts, tsAvailable := requestTS[stream]; tsAvailable {
// tracking connections allows for HTTP responses without trace headers
// to be correlated with the request that brought them to existence.
if tf, tracked := fm.trackConnection(ctx,
carrier, serial, flowID, tcpFlags, seq, ack, local, ts); tracked {
activeRequests = carrier.activeRequests.Add(1)
if activeRequests > 0 {
wg.Add(1)
requestTS := time.Now()
requestMsg := sf.Format("request/{0}", *ts.traceID)
go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &requestTS, &requestMsg)
} else if tf.isActive.CompareAndSwap(true, false) {
// if more HTTP responses are seen before the currentl HTTP request:
// - do not block `FIN+ACK`/`RST` packets from making progress;
// de-activate the `unblocker` for this `TracedFlow`.
tf.unblocker.Stop()
}
}
}
}
}
sizeOfResponseStreams := int64(len(responseStreams))
sizeOfResponseTraceAndSpans := len(responseTS)
// handle flow `unlock` for responses
if sizeOfResponseTraceAndSpans > 0 || sizeOfResponseStreams > 0 {
for _, stream := range responseStreams {
if ts, tsAvailable := responseTS[stream]; tsAvailable {
if tf, traceFound := tracedFlowProvider(ts.streamID); traceFound {
activeRequests = carrier.activeRequests.Add(-1)
if activeRequests >= 0 &&
*tf.ts.traceID == *ts.traceID &&
tf.isActive.CompareAndSwap(true, false) &&
tf.unblocker.Stop() {
wg.Done()
responseTS := time.Now()
responseMsg := sf.Format("response/{0}", *ts.traceID)
go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &responseTS, &responseMsg)
}
}
}
}
}
_, lockLatency := UnlockWithTCPFlagsFN(ctx, tcpFlags)
return &activeRequests, lockLatency
}
} else {
activeRequests := carrier.activeRequests.Load()
// do not provide trace tracking for non TCP `PSH+ACK`
lock.UnlockWithTraceAndSpan = func(
ctx context.Context,
tcpFlags *uint8, /* tcpFlags */
_ bool, /* isHTTP2 */
_ []uint32, /* requestStreams */
_ []uint32, /* responseStreams */
_ map[uint32]*traceAndSpan, /* requestTS */
_ map[uint32]*traceAndSpan, /* responseTS */
) (*int64, *time.Duration) {
// fallback to unlock by TCP flags
_, lockLatency := UnlockWithTCPFlagsFN(ctx, tcpFlags)
return &activeRequests, lockLatency
}
}
return lock, func(streamID *uint32) (*traceAndSpan, bool) {
if tf, ok := tracedFlowProvider(streamID); ok {
return tf.ts, ok
}
return nil, false
}
}