pcap-cli/internal/transformer/flow_mutex.go (492 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package transformer import ( "context" "fmt" "io" "os" "strconv" "sync" "sync/atomic" "time" "github.com/Jeffail/gabs/v2" "github.com/alphadose/haxmap" sf "github.com/wissance/stringFormatter" "github.com/zhangyunhao116/skipmap" ) type ( TracedFlowProvider = func(*uint32) (*TracedFlow, bool) TraceAndSpanProvider = func(*uint32) (*traceAndSpan, bool) UnlockWithTraceAndSpan = func( context.Context, *uint8, /* TCP flags */ bool, /* isHTTP2 */ []uint32, []uint32, map[uint32]*traceAndSpan, map[uint32]*traceAndSpan, ) (*int64, *time.Duration) UnlockWithTCPFlags = func( context.Context, *uint8, /* TCP flags */ ) (bool, *time.Duration) Unlock = func(context.Context) (bool, *time.Duration) flowMutex struct { Debug bool MutexMap *haxmap.Map[uint64, *flowLockCarrier] traceToHttpRequestMap *haxmap.Map[string, *httpRequest] flowToStreamToSequenceMap FTSTSM } flowLock struct { IsHTTP2 func() bool Unlock Unlock UnlockAndRelease Unlock UnlockWithTCPFlags UnlockWithTCPFlags UnlockWithTraceAndSpan UnlockWithTraceAndSpan } flowLockCarrier struct { mu *sync.Mutex wg *sync.WaitGroup serial *uint64 flowID *uint64 released *atomic.Bool createdAt *time.Time lastLockedAt *time.Time lastUnlockedAt *time.Time isHTTP2 bool activeRequests *atomic.Int64 } TracedFlow struct { serial *uint64 flowID *uint64 lock *flowLockCarrier ts *traceAndSpan isActive *atomic.Bool unblocker *time.Timer } STTFM = *skipmap.Uint32Map[*TracedFlow] // SequenceTo[TracedFlow]Map STSM = *haxmap.Map[uint32, STTFM] // StreamTo[SequenceToTracedFlow]Map FTSTSM = *haxmap.Map[uint64, STSM] // FlowTo[StreamTo[SequenceToTracedFlow]]Map ) const ( carrierDeadline = 600 * time.Second /* 10m */ trackingDeadline = 10 * time.Second /* 10s */ ) func newFlowMutex( ctx context.Context, debug bool, flowToStreamToSequenceMap FTSTSM, traceToHttpRequestMap *haxmap.Map[string, *httpRequest], ) *flowMutex { fm := &flowMutex{ Debug: debug, MutexMap: haxmap.New[uint64, *flowLockCarrier](), flowToStreamToSequenceMap: flowToStreamToSequenceMap, traceToHttpRequestMap: traceToHttpRequestMap, } // reap orphaned `flowLockCarrier`s go fm.startReaper(ctx) // don't fear the reaper return fm } func (fm *flowMutex) log( ctx context.Context, serial *uint64, flowID *uint64, tcpFlags *uint8, seq, ack *uint32, timestamp *time.Time, message *string, ) { if !fm.Debug { return } json := gabs.New() id := ctx.Value(ContextID) logName := ctx.Value(ContextLogName) pcap, _ := json.Object("pcap") pcap.Set(id, "id") pcap.Set(logName, "ctx") serialStr := strconv.FormatUint(*serial, 10) pcap.Set(serialStr, "num") flowIDstr := strconv.FormatUint(*flowID, 10) json.Set(flowIDstr, "flow") tcpJSON, _ := json.Object("tcp") tcpJSON.Set(tcpFlagsStr[*tcpFlags], "flags") tcpJSON.Set(*seq, "seq") tcpJSON.Set(*ack, "ack") timestampJSON, _ := json.Object("timestamp") timestampJSON.Set(timestamp.Unix(), "seconds") timestampJSON.Set(timestamp.Nanosecond(), "nanos") labels, _ := json.Object("logging.googleapis.com/labels") labels.Set("pcap", "run.googleapis.com/tool") labels.Set(id, "run.googleapis.com/pcap/id") labels.Set(logName, "run.googleapis.com/pcap/name") operation, _ := json.Object("logging.googleapis.com/operation") operation.Set(sf.Format("{0}/debug", logName), "producer") operation.Set(sf.Format("{0}/flow/{1}/debug", id, flowIDstr), "id") json.Set(sf.Format("#:{0} | flow:{1} | {2}", serialStr, flowIDstr, *message), "message") io.WriteString(os.Stderr, json.String()+"\n") } func (fm *flowMutex) startReaper(ctx context.Context) { // reaping is necessary as packets translations order is not guaranteed: // so if all `FIN+ACK`/`RST+*` are seen before other non-termination combinations within the same flow: // - a new carrier will be created to hold its flow lock, and this new carrier will not be organically reaped. // additionally: for connection pooling, long running not-used connections should be dropped to reclaim memory. ticker := time.NewTicker(carrierDeadline) for { select { case <-ctx.Done(): ticker.Stop() return case <-ticker.C: fm.MutexMap.ForEach( func(flowID uint64, carrier *flowLockCarrier) bool { if carrier == nil || carrier.lastUnlockedAt == nil || !carrier.mu.TryLock() { return true } defer carrier.mu.Unlock() lastUnlocked := time.Since(*carrier.lastUnlockedAt) if lastUnlocked >= carrierDeadline { fm.untrackConnection(ctx, &flowID, carrier) fm.MutexMap.Del(flowID) io.WriteString(os.Stderr, sf.Format("reaped flow '{0}' after {1}\n", flowID, lastUnlocked.String())) } return true }) } } } func (fm *flowMutex) getTracedFlow( flowID *uint64, seq, ack *uint32, local bool, ) (TracedFlowProvider, bool) { ref := seq if local { ref = ack } streamToSequenceMap, ok := fm.flowToStreamToSequenceMap.Get(*flowID) // no HTTP/1.1 request with a `traceID` has been seen for this `flowID` if !ok { // it is also possible that packet for HTTP request for this `flowID` return func(_ *uint32) (*TracedFlow, bool) { return nil, false }, false } // [ToDo]: memoize stream to trace mapping return func(stream *uint32) (*TracedFlow, bool) { // an HTTP/1.1 request with a `traceID` has already been seen for this `flowID` var tracedFlow, lastTracedFlow *TracedFlow = nil, nil if sttsm, ok := streamToSequenceMap.Get(*stream); ok { sttsm.Range(func(r uint32, tf *TracedFlow) bool { // Loop over the map keys (ascending sequence numbers) until one greater than `sequence` is found. // HTTP/1.1 is not multiplexed, so a new request using the same TCP connection ( i/e: pooling ) // should be observed (alongside its `traceID`) with a higher sequence number than the previous one; // when the key (a sequence number) is greater than the current one, stop looping; // the previously analyzed `key` (sequence number) must be pointing to the correct `traceID`. // TL;DR: `traceID`s exist within a specific TCP sequence range, which configures a boundary. if *ref > r { tracedFlow = tf } lastTracedFlow = tf return true }) // TCP sequence number is `uint32` so it is possible // for for it to be rolled over if it gets too big. // In such case `sequence` was not greater than any `key` in the map, // so the last visited `key` might be pointing to the correct `traceID` if tracedFlow == nil { tracedFlow = lastTracedFlow } return tracedFlow, true } return nil, false }, true } func (fm *flowMutex) sequenceToTracedFlowMapProvider( ctx context.Context, serial *uint64, flowID *uint64, tcpFlags *uint8, isLocal bool, streamID *uint32, seq, ack *uint32, tf *TracedFlow, streamToSequenceMap STSM, ) STTFM { sttfm, _ := streamToSequenceMap.GetOrCompute(*streamID, func() STTFM { sequenceToTracedFlowMap := skipmap.NewUint32[*TracedFlow]() if isLocal { sequenceToTracedFlowMap.Store(*ack, tf) } else { sequenceToTracedFlowMap.Store(*seq, tf) } trackingTS := time.Now() trackingMsg := sf.Format("tracking/{0}", *tf.ts.traceID) go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &trackingTS, &trackingMsg) return sequenceToTracedFlowMap }) return sttfm } func (fm *flowMutex) trackConnection( ctx context.Context, lock *flowLockCarrier, serial *uint64, flowID *uint64, tcpFlags *uint8, seq, ack *uint32, local bool, ts *traceAndSpan, ) (*TracedFlow, bool) { if ts == nil { return nil, false } var isActive atomic.Bool tf := &TracedFlow{ lock: lock, serial: serial, flowID: flowID, ts: ts, isActive: &isActive, } isActive.Store(true) tf.unblocker = time.AfterFunc(trackingDeadline, func() { // allow termination events to continue if !isActive.CompareAndSwap(true, false) { return } lock.mu.Lock() defer lock.mu.Unlock() tsBeforeUnblocling := time.Now() msgBeforeUnblocking := sf.Format("unblocking/{0}", *ts.traceID) go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsBeforeUnblocling, &msgBeforeUnblocking) if lock.activeRequests.Add(-1) >= 0 { lock.wg.Done() tsAfterUnblocking := time.Now() msgAfterUnblocking := sf.Format("unblocked/{0}", *ts.traceID) go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsAfterUnblocking, &msgAfterUnblocking) } }) if streamToSequenceMap, loaded := fm.flowToStreamToSequenceMap. GetOrCompute(*flowID, func() STSM { streamToSequenceMap := haxmap.New[uint32, STTFM]() fm.sequenceToTracedFlowMapProvider(ctx, serial, flowID, tcpFlags, local, ts.streamID, seq, ack, tf, streamToSequenceMap) return streamToSequenceMap }); loaded { // if the `Flow-to-Sequence-MAP` already contains an entry for this `FlowID`: // - the `Stream-to-[Sequence-to-TracedFlow]-MAP` pointed by the `FlowID`: // - does not contain this `StreamID` pointing to its own `Sequence-to-TracedFlow-MAP`. if _, loaded := streamToSequenceMap.GetOrCompute(*ts.streamID, func() STTFM { return fm.sequenceToTracedFlowMapProvider(ctx, serial, flowID, tcpFlags, local, ts.streamID, seq, ack, tf, streamToSequenceMap) }); loaded { // if the `Stream-to-[Sequence-to-TracedFlow]-MAP` already contains an entry for this `StreamID`: // - the `Sequence-to-TracedFlow-MAP` pointed by the `StreamID`: // - does not contain this `sequence` pointing to its own `TracedFlow`. fm.sequenceToTracedFlowMapProvider(ctx, serial, flowID, tcpFlags, local, ts.streamID, seq, ack, tf, streamToSequenceMap) } } return tf, true } func (fm *flowMutex) untrackConnection( _ context.Context, flowID *uint64, lock *flowLockCarrier, ) { defer func() { if r := recover(); r != nil && fm.Debug { transformerLogger.Println("panic@untrackConnection: ", r) } }() if ftsm, ok := fm.flowToStreamToSequenceMap.Get(*flowID); ok { streams := make([]uint32, ftsm.Len()) streamIndex := 0 ftsm.ForEach(func(stream uint32, sttsm STTFM) bool { streams[streamIndex] = stream sequences := make([]uint32, sttsm.Len()) sequenceIndex := 0 sttsm.Range(func(sequence uint32, tf *TracedFlow) bool { sequences[sequenceIndex] = sequence if tf.isActive.CompareAndSwap(true, false) { tf.unblocker.Stop() } // remove orphaned `traceID`s: fm.traceToHttpRequestMap.Del(*tf.ts.traceID) sequenceIndex += 1 return true }) for i := sequenceIndex - 1; i >= 0; i-- { sttsm.Delete(sequences[i]) } streamIndex += 1 return true }) for i := streamIndex - 1; i >= 0; i-- { ftsm.Del(streams[i]) } fm.flowToStreamToSequenceMap.Del(*flowID) } for lock.activeRequests.Load() > 0 { lock.activeRequests.Add(-1) lock.wg.Done() } fm.MutexMap.Del(*flowID) } func (fm *flowMutex) newFlowLockCarrier( serial, flowID *uint64, ) *flowLockCarrier { var activeRequests atomic.Int64 var released atomic.Bool activeRequests.Store(0) released.Store(false) createdAt := time.Now() return &flowLockCarrier{ mu: new(sync.Mutex), wg: new(sync.WaitGroup), serial: serial, // packet that created this lock flowID: flowID, // flow that created this lock released: &released, createdAt: &createdAt, activeRequests: &activeRequests, } } func (fm *flowMutex) lock( ctx context.Context, serial *uint64, flowID *uint64, tcpFlags *uint8, seq, ack *uint32, local bool, ) ( *flowLock, TraceAndSpanProvider, ) { carrier, _ := fm.MutexMap. GetOrCompute(*flowID, func() *flowLockCarrier { return fm.newFlowLockCarrier(serial, flowID) }) mu := carrier.mu wg := carrier.wg // changing the order of `Wait` and `Lock` causes a deadlock if isConnectionTermination(tcpFlags) { tsBeforeWaiting := time.Now() msgBeforeWaiting := "waiting" go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsBeforeWaiting, &msgBeforeWaiting) // Connection termination events must wait for the flow to stop being trace-tracked. // some important considerations: // - If this flow is not trace-tracked ( meaning this termination event acquires the lock ahead of any other TCP segment carrying an HTTP message with trace information ) `Wait()` won't block: // - this could happen because not only packet processing but also layers processing are done concurrently in order to complete packet translations as fast as possible. // - the side effect of this high level of concurrency is that order of execution is not guaranteed and so trace tracking is currently done as best-effort. // - in practice, the common scenario is for connection termination events to arrive at `lock` after TCP segments container tracing information. // - this is currently by design: doing it differently would require to store TCP events in memory which is not great to keep memory footprint small. // - currently the only state stored in memory is: a Map from TCP flow to HTTP stream to TCP sequence that points to its corresponding trace information. // - This is `true` also for TCP segments carrying HTTP responses if those acquire the lock before the ones carrying HTTP requests, but these won't clear/release trace-tracking information. // How to do it differently?: we'd need to store semaphores in a table indexable by `FlowID` and have termination events wait on them until the TCP segments carrying tracing information are seen. // - this, however, is a wild assumption, as TCP segments carrying tracing information might never arrive and so the termination events will be locked for no reason. // - if this a price we'd like to pay, then this scenario could be handled by having a watchdog running periodically to unblock termination events after a deadline, but this approach feels clumsy at the moment. // - why we would not want to do this?: waiting on some non-deterministical event to happen means that a go-routine from the packet processing pool will be hijacked maybe for no good reason. // if execution is done: // - do not throttle termination packets processing // - release go routines ASAP to allow termination flow to continue select { case <-ctx.Done(): break default: wg.Wait() } tsAfterWaiting := time.Now() msgAfterWaiting := "continue" go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &tsAfterWaiting, &msgAfterWaiting) } // it is possible that all packets for this flow arrive to `Lock` at almost the same time: // - which means that termination could delete the reference to `FlowLock` from `MutexMap` while non terminating ones are waiting for the lock mu.Lock() lockAcquiredTS := time.Now() carrier.lastLockedAt = &lockAcquiredTS tracedFlowProvider, _ := fm.getTracedFlow(flowID, seq, ack, local) _unlock := func() { defer func(mu *sync.Mutex) { if err := recover(); err != nil { io.WriteString(os.Stderr, fmt.Sprintf("error at flow[%d]: %+v | %+v\n", *flowID, err, mu)) } }(mu) defer mu.Unlock() lastUnlockedTS := time.Now() carrier.lastUnlockedAt = &lastUnlockedTS } UnlockAndReleaseFN := func(ctx context.Context) (bool, *time.Duration) { defer _unlock() // many translations within the same flow may be waiting to acquire the lock; // if multiple translations try to release, i/e: 2*`FIN+ACK`, // then both will release the lock, but just 1 must yield connection untracking. if carrier.activeRequests.Load() == 0 && carrier.released.CompareAndSwap(false, true) { // termination packets will clean the tracing info available for each flow: // - give some margin for all other packets to access flow state, and then flush it. select { case <-ctx.Done(): // untrack connection immediately if the context is done fm.untrackConnection(ctx, flowID, carrier) default: time.AfterFunc(trackingDeadline, func() { timestamp := time.Now() message := "untracking" go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &timestamp, &message) fm.untrackConnection(ctx, flowID, carrier) }) } lockLatency := time.Since(lockAcquiredTS) return true, &lockLatency } lockLatency := time.Since(lockAcquiredTS) return false, &lockLatency } UnlockWithTCPFlagsFN := func( ctx context.Context, tcpFlags *uint8, ) (bool, *time.Duration) { if isConnectionTermination(tcpFlags) { return UnlockAndReleaseFN(ctx) } defer _unlock() lockLatency := time.Since(lockAcquiredTS) return false, &lockLatency } // this is just an alias of `UnlockWithTCPFlagsFN`, // - but it uses the same `tcpFlags` used to acquire the `lock` UnlockFn := func(ctx context.Context) (bool, *time.Duration) { return UnlockWithTCPFlagsFN(ctx, tcpFlags) } IsHTTP2FN := func() bool { return carrier.isHTTP2 } // since all TCP data is known: // - it is possible to return a `traceID` // - since this is guarded by a lock, it is thread-safe // much richer analysis is also possible // these are the only methods for consumers to interact with the lock lock := &flowLock{ IsHTTP2: IsHTTP2FN, Unlock: UnlockFn, UnlockAndRelease: UnlockAndReleaseFN, UnlockWithTCPFlags: UnlockWithTCPFlagsFN, } if *tcpFlags&(tcpSyn|tcpFin|tcpRst) == 0 { // provide trace tracking only for TCP: `PSH+ACK`, and `ACK`. // For HTTP/2 multiple streams are delivered over the same TCP connection, so: // - it is possible to observe multiple requests and responses in the same TCP segment, // - `unlock` must handle the scenario where the same TCP segment contains both HTTP requests/responses. // It is possible to receive HTTP requests/responses without `traceID`, so: // - both must be accounted to accurately calculate the total number of `activeRequests`: // - this is regardless of `traceID` being available; otherwise, termination packets are blocked: // - this will not trigger a deadlock as only the termination is being delayed, // - the `unblocker`s will eventually allow termination packets to make progress. // The following flow-unlocking mechanism tries to: // - prevent trace-tracking information removal by connection termination packets // - store trace-tracking information for HTTP requests/responses: // - that will be used to link HTTP requests to HTTP responses. lock.UnlockWithTraceAndSpan = func( ctx context.Context, tcpFlags *uint8, isHTTP2 bool, requestStreams []uint32, responseStreams []uint32, requestTS map[uint32]*traceAndSpan, responseTS map[uint32]*traceAndSpan, ) (*int64, *time.Duration) { carrier.isHTTP2 = isHTTP2 activeRequests := carrier.activeRequests.Load() sizeOfRequestStreams := int64(len(requestStreams)) sizeOfRequestTraceAndSpans := len(requestTS) // handle flow `unlock` for requests if sizeOfRequestTraceAndSpans > 0 || sizeOfRequestStreams > 0 { for _, stream := range requestStreams { if ts, tsAvailable := requestTS[stream]; tsAvailable { // tracking connections allows for HTTP responses without trace headers // to be correlated with the request that brought them to existence. if tf, tracked := fm.trackConnection(ctx, carrier, serial, flowID, tcpFlags, seq, ack, local, ts); tracked { activeRequests = carrier.activeRequests.Add(1) if activeRequests > 0 { wg.Add(1) requestTS := time.Now() requestMsg := sf.Format("request/{0}", *ts.traceID) go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &requestTS, &requestMsg) } else if tf.isActive.CompareAndSwap(true, false) { // if more HTTP responses are seen before the currentl HTTP request: // - do not block `FIN+ACK`/`RST` packets from making progress; // de-activate the `unblocker` for this `TracedFlow`. tf.unblocker.Stop() } } } } } sizeOfResponseStreams := int64(len(responseStreams)) sizeOfResponseTraceAndSpans := len(responseTS) // handle flow `unlock` for responses if sizeOfResponseTraceAndSpans > 0 || sizeOfResponseStreams > 0 { for _, stream := range responseStreams { if ts, tsAvailable := responseTS[stream]; tsAvailable { if tf, traceFound := tracedFlowProvider(ts.streamID); traceFound { activeRequests = carrier.activeRequests.Add(-1) if activeRequests >= 0 && *tf.ts.traceID == *ts.traceID && tf.isActive.CompareAndSwap(true, false) && tf.unblocker.Stop() { wg.Done() responseTS := time.Now() responseMsg := sf.Format("response/{0}", *ts.traceID) go fm.log(ctx, serial, flowID, tcpFlags, seq, ack, &responseTS, &responseMsg) } } } } } _, lockLatency := UnlockWithTCPFlagsFN(ctx, tcpFlags) return &activeRequests, lockLatency } } else { activeRequests := carrier.activeRequests.Load() // do not provide trace tracking for non TCP `PSH+ACK` lock.UnlockWithTraceAndSpan = func( ctx context.Context, tcpFlags *uint8, /* tcpFlags */ _ bool, /* isHTTP2 */ _ []uint32, /* requestStreams */ _ []uint32, /* responseStreams */ _ map[uint32]*traceAndSpan, /* requestTS */ _ map[uint32]*traceAndSpan, /* responseTS */ ) (*int64, *time.Duration) { // fallback to unlock by TCP flags _, lockLatency := UnlockWithTCPFlagsFN(ctx, tcpFlags) return &activeRequests, lockLatency } } return lock, func(streamID *uint32) (*traceAndSpan, bool) { if tf, ok := tracedFlowProvider(streamID); ok { return tf.ts, ok } return nil, false } }