pcap-cli/internal/transformer/transformer.go (647 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package transformer import ( "context" "errors" "fmt" "io" "log" "net" "os" "regexp" "runtime/debug" "strings" "sync" "sync/atomic" "time" mapset "github.com/deckarep/golang-set/v2" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/panjf2000/ants/v2" concurrently "github.com/tejzpr/ordered-concurrently/v3" ) var transformerLogger = log.New(os.Stderr, "[transformer] - ", log.LstdFlags) type ( PcapTranslatorFactory = func( context.Context, bool, // debug PcapVerbosity, *PcapIface, *PcapEphemeralPorts, ) PcapTranslator PcapTranslatorFmt uint8 PcapVerbosity uint8 PcapTranslator interface { next(context.Context, *PcapIface, *uint64, *gopacket.Packet) fmt.Stringer translateError(context.Context, error) fmt.Stringer translateLayerError(context.Context, gopacket.LayerType, error) fmt.Stringer translateEthernetLayer(context.Context, *layers.Ethernet) fmt.Stringer translateARPLayer(context.Context, *layers.ARP) fmt.Stringer translateIPv4Layer(context.Context, *layers.IPv4) fmt.Stringer translateIPv6Layer(context.Context, *layers.IPv6) fmt.Stringer translateICMPv4Layer(context.Context, *layers.ICMPv4) fmt.Stringer translateICMPv6Layer(context.Context, *layers.ICMPv6) fmt.Stringer translateICMPv6EchoLayer(context.Context, fmt.Stringer, *layers.ICMPv6Echo) fmt.Stringer translateICMPv6RedirectLayer(context.Context, fmt.Stringer, *layers.ICMPv6Redirect) fmt.Stringer translateICMPv6L3HeaderLayer(context.Context, fmt.Stringer, *layers.ICMPv6) fmt.Stringer translateUDPLayer(context.Context, *layers.UDP) fmt.Stringer translateTCPLayer(context.Context, *layers.TCP) fmt.Stringer translateTLSLayer(context.Context, *layers.TLS) fmt.Stringer translateDNSLayer(context.Context, *layers.DNS) fmt.Stringer translateErrorLayer(context.Context, *gopacket.DecodeFailure) fmt.Stringer merge(context.Context, fmt.Stringer, fmt.Stringer) (fmt.Stringer, error) finalize(context.Context, netIfaceIndex, *PcapIface, *uint64, *gopacket.Packet, bool, fmt.Stringer) (fmt.Stringer, error) write(context.Context, io.Writer, *fmt.Stringer) (int, error) done(context.Context) } pcapTranslator struct { debug bool verbosity PcapVerbosity iface *PcapIface ephemerals *PcapEphemeralPorts } netIfaceIndex map[string]*PcapIface PcapTransformer struct { ctx context.Context iface *PcapIface ifaces netIfaceIndex ephemerals *PcapEphemeralPorts loggerPrefix *string ich chan concurrently.WorkFunction och <-chan concurrently.OrderedOutput translator PcapTranslator translatorPool *ants.PoolWithFunc writerPool *ants.MultiPoolWithFunc writers []io.Writer numWriters *uint8 writeQueues []chan *fmt.Stringer writeQueuesDone []chan struct{} wg *sync.WaitGroup preserveOrder bool connTracking bool apply func(*pcapTranslatorWorker) error counter *atomic.Int64 filters PcapFilters debug, compat bool verbosity PcapVerbosity } IPcapTransformer interface { WaitDone(context.Context, *time.Duration) Apply(context.Context, *gopacket.Packet, *uint64) error } pcapWriteTask struct { ctx context.Context writer *uint8 translation *fmt.Stringer } PcapIface struct { Index uint8 Name string Addrs mapset.Set[string] } PcapEphemeralPorts struct { Min, Max uint16 } ContextKey string ) const ( ContextID = ContextKey("id") ContextLogName = ContextKey("logName") ContextVerbosity = ContextKey("verbosity") ContextDebug = ContextKey("debug") ) const ( VERBOSITY_INFO PcapVerbosity = iota VERBOSITY_DEBUG ) //go:generate stringer -type=PcapTranslatorFmt const ( TEXT PcapTranslatorFmt = iota JSON PROTO ) var pcapTranslatorFmts = map[string]PcapTranslatorFmt{ "json": JSON, "text": TEXT, "proto": PROTO, } var translators sync.Map const ( projectIdEnvVarName = "PROJECT_ID" tcpOptionsRegex = `^TCPOption\((?P<name>.+?):(?P<value>.*?)\)$` http11RequestPayloadRegexStr = `^(?P<method>.+?)\s(?P<url>.+?)\sHTTP/1\.1(?:\r?\n)?.*` http11ResponsePayloadRegexStr = `^HTTP/1\.1\s(?P<code>\d{3})\s(?P<status>.+?)(?:\r?\n)?.*` http2PrefaceRegexStr = `^PRI.+?HTTP/2\.0\r?\n\r?\nSM\r?\n\r?\n` http11LineSeparator = "\r\n" http2RawFrameRegexStr = `^\[FrameHeader\s(.+?)\]` httpContentLengthHeader = "Content-Length" cloudTraceContextHeader = "x-cloud-trace-context" traceparentHeader = "traceparent" // keeping it in sync with `h2`: // - A stream identifier of zero (0x00) is used for connection control messages http11StreamID = uint32(1) tcpSynStr = "SYN" tcpAckStr = "ACK" tcpPshStr = "PSH" tcpFinStr = "FIN" tcpRstStr = "RST" tcpUrgStr = "URG" tcpEceStr = "ECE" tcpCwrStr = "CWR" ) var ( tcpFlags = map[string]uint8{ tcpFinStr: 0b00000001, tcpSynStr: 0b00000010, tcpRstStr: 0b00000100, tcpPshStr: 0b00001000, tcpAckStr: 0b00010000, tcpUrgStr: 0b00100000, tcpEceStr: 0b01000000, tcpCwrStr: 0b10000000, } tcpFlagNil = uint8(0b00000000) tcpSynAckStr = tcpSynStr + "|" + tcpAckStr tcpSynRstStr = tcpSynStr + "|" + tcpRstStr tcpPshAckStr = tcpPshStr + "|" + tcpAckStr tcpFinAckStr = tcpFinStr + "|" + tcpAckStr tcpRstAckStr = tcpRstStr + "|" + tcpAckStr tcpUrgAckStr = tcpUrgStr + "|" + tcpAckStr tcpSynPshAckStr = tcpSynStr + "|" + tcpPshStr + "|" + tcpAckStr tcpFinRstAckStr = tcpFinStr + "|" + tcpRstStr + "|" + tcpAckStr tcpSyn = tcpFlags[tcpSynStr] tcpAck = tcpFlags[tcpAckStr] tcpPsh = tcpFlags[tcpPshStr] tcpFin = tcpFlags[tcpFinStr] tcpRst = tcpFlags[tcpRstStr] tcpUrg = tcpFlags[tcpUrgStr] tcpEce = tcpFlags[tcpEceStr] tcpCwr = tcpFlags[tcpCwrStr] tcpSynAck = tcpSyn | tcpAck tcpSynRst = tcpSyn | tcpRst tcpPshAck = tcpPsh | tcpAck tcpFinAck = tcpFin | tcpAck tcpRstAck = tcpRst | tcpAck tcpUrgAck = tcpUrg | tcpAck tcpSynPshAck = tcpSyn | tcpPsh | tcpAck tcpFinRstAck = tcpFin | tcpRst | tcpAck tcpFlagsStr = map[uint8]string{ tcpSyn: tcpSynStr, tcpAck: tcpAckStr, tcpPsh: tcpPshStr, tcpFin: tcpFinStr, tcpRst: tcpRstStr, tcpUrg: tcpUrgStr, tcpEce: tcpEceStr, tcpCwr: tcpCwrStr, tcpSynAck: tcpSynAckStr, tcpSynRst: tcpSynRstStr, tcpPshAck: tcpPshAckStr, tcpFinAck: tcpFinAckStr, tcpRstAck: tcpRstAckStr, tcpUrgAck: tcpUrgAckStr, tcpSynPshAck: tcpSynPshAckStr, tcpFinRstAck: tcpFinRstAckStr, } ) var ( tcpOptionRgx = regexp.MustCompile(tcpOptionsRegex) http11RequestPayloadRegex = regexp.MustCompile(http11RequestPayloadRegexStr) http11ResponsePayloadRegex = regexp.MustCompile(http11ResponsePayloadRegexStr) http2PrefaceRegex = regexp.MustCompile(http2PrefaceRegexStr) http2RawFrameRegex = regexp.MustCompile(http2RawFrameRegexStr) http11Separator = []byte(http11LineSeparator) http11BodySeparator = []byte(http11LineSeparator + http11LineSeparator) http11HeaderSeparator = []byte(":") httpContentLengthHeaderBytes = []byte(httpContentLengthHeader) cloudTraceContextHeaderBytes = []byte(cloudTraceContextHeader) traceparentHeaderBytes = []byte(traceparentHeader) cloudProjectID = os.Getenv(projectIdEnvVarName) cloudTracePrefix = "projects/" + cloudProjectID + "/traces/" traceAndSpanRegexStr = map[string]string{ cloudTraceContextHeader: `^(?P<trace>.+?)/(?P<span>.+?)(?:;o=.*)?$`, traceparentHeader: `^.+?-(?P<trace>.+?)-(?P<span>.+?)(?:-.+)?$`, } traceAndSpanRegex = map[string]*regexp.Regexp{ cloudTraceContextHeader: regexp.MustCompile(traceAndSpanRegexStr[cloudTraceContextHeader]), traceparentHeader: regexp.MustCompile(traceAndSpanRegexStr[traceparentHeader]), } ) var ( errUnavailableTranslation = errors.New("packet translation is unavailable") errUnavailableTranslator = errors.New("packet translator is unavailable") ) func registerTranslatorFactory( format PcapTranslatorFmt, translatorFactory PcapTranslatorFactory, ) { translators.Store(format, translatorFactory) } func (t *PcapTransformer) writeTranslation(ctx context.Context, task *pcapWriteTask) error { defer func() { t.counter.Add(-1) t.wg.Done() }() select { case <-ctx.Done(): if *task.writer == 0 { // best-effort: dump all non-written translations into `STDERR` fmt.Fprintln(os.Stderr, (*task.translation).String()) } return ctx.Err() default: _, err := t.translator.write(ctx, t.writers[*task.writer], task.translation) return err } } func (t *PcapTransformer) publishTranslation( ctx context.Context, translation *fmt.Stringer, ) error { select { case <-ctx.Done(): return ctx.Err() default: if translation == nil { return fmt.Errorf("%s publishTranslation: %w", *t.loggerPrefix, errUnavailableTranslation) } } // fan-out translation into all writers for _, translations := range t.writeQueues { // if any of the consumers' buffers is full, // the saturated/slower one will block and delay iterations. // Blocking is more likely when `preserveOrder` is enabled. translations <- translation } return nil } func (t *PcapTransformer) produceTranslation( ctx context.Context, task *pcapTranslatorWorker, ) error { translation := task.Run(ctx) if translation == nil { return nil } return t.publishTranslation(ctx, translation.(*fmt.Stringer)) } func (t *PcapTransformer) produceTranslations(ctx context.Context) { for translation := range t.och { // translations are made available in the enqueued order // consume translations and push them into translations consumers if err := t.publishTranslation(ctx, translation.Value.(*fmt.Stringer)); err != nil { rollbackTranslation(ctx, t) } } } func (t *PcapTransformer) consumeTranslations(ctx context.Context, index *uint8) error { // `consumeTranslations` runs in 1 goroutine per writer, // so it needs to be context aware to be able to gracefully stop, thus preventing a leak. for { select { case <-ctx.Done(): // drop translations if context is already done droppedTranslations := uint64(0) // some translations may have been on-going when context was cancelled: // - fully consume the `writerQueue` and rollback the write commitment, // - block until `close` on the `writerQueue` is called by `WaitDone` for translation := range t.writeQueues[*index] { // best-effort: dump all non-written translations into `STDERR` if *index == 0 { fmt.Fprintln(os.Stderr, (*translation).String()) } droppedTranslations += 1 t.counter.Add(-1) t.wg.Done() } transformerLogger.Printf("%s translations consumer DONE | writer:%d | dropped:%d\n", *t.loggerPrefix, *index+1, droppedTranslations) close(t.writeQueuesDone[*index]) return ctx.Err() case translation := <-t.writeQueues[*index]: task := &pcapWriteTask{ ctx: ctx, writer: index, translation: translation, } if t.preserveOrder || t.connTracking { // this is mostly blocking t.writeTranslation(ctx, task) } else { // this is mostly non-blocking t.writerPool.Invoke(task) } } } } func (t *PcapTransformer) waitForContextDone(ctx context.Context) error { <-ctx.Done() close(t.ich) return ctx.Err() } // returns when all packets have been transformed and written func (t *PcapTransformer) WaitDone(ctx context.Context, timeout *time.Duration) { ts := time.Now() timer := time.NewTimer(*timeout) writeDoneChan := make(chan struct{}) go func(t *PcapTransformer, writeDone chan struct{}) { if !t.preserveOrder && !t.connTracking { transformerLogger.Printf("%s gracefully terminating | tp: %d/%d | wp: %d/%d | pending:%d | deadline: %v\n", *t.loggerPrefix, t.translatorPool.Running(), t.translatorPool.Waiting(), t.writerPool.Running(), t.writerPool.Waiting(), t.counter.Load(), timeout) } else { transformerLogger.Printf("%s gracefully terminating | pending: %d | deadline: %v\n", *t.loggerPrefix, t.counter.Load(), timeout) } t.wg.Wait() // wait for all translations to be written close(writeDone) }(t, writeDoneChan) select { case <-timer.C: if !t.preserveOrder && !t.connTracking { transformerLogger.Printf("%s timed out waiting for graceful termination | tp: %d/%d | wp: %d/%d | pending:%d\n", *t.loggerPrefix, t.translatorPool.Running(), t.translatorPool.Waiting(), t.writerPool.Running(), t.writerPool.Waiting(), t.counter.Load()) } else { transformerLogger.Printf("%s timed out waiting for graceful termination | pending:%d\n", *t.loggerPrefix, t.counter.Load()) } for _, writeQueue := range t.writeQueues { close(writeQueue) // close writer channels } t.translator.done(ctx) return case <-writeDoneChan: if !timer.Stop() { <-timer.C } transformerLogger.Printf("%s STOPPED | tp: %d/%d | wp: %d/%d | pending:%d | latency: %v\n", *t.loggerPrefix, t.translatorPool.Running(), t.translatorPool.Waiting(), t.writerPool.Running(), t.writerPool.Waiting(), t.counter.Load(), time.Since(ts)) } for i, writeQueue := range t.writeQueues { // unblock `consumeTranslations` goroutines close(writeQueue) // close writer channels <-t.writeQueuesDone[i] } _timeout := *timeout - time.Since(ts) // if order is not enforced: there are 2 worker pools to be stopped if _timeout > 0 && !t.preserveOrder && !t.connTracking { transformerLogger.Printf("%s releasing worker pools | deadline: %v\n", *t.loggerPrefix, _timeout) var poolReleaserWG sync.WaitGroup poolReleaserWG.Add(2) go func(t *PcapTransformer, wg *sync.WaitGroup, deadline *time.Duration) { t.translatorPool.ReleaseTimeout(*deadline) wg.Done() }(t, &poolReleaserWG, &_timeout) go func(t *PcapTransformer, wg *sync.WaitGroup, deadline *time.Duration) { t.writerPool.ReleaseTimeout(*deadline) wg.Done() }(t, &poolReleaserWG, &_timeout) poolReleaserWG.Wait() transformerLogger.Printf("%s released worker pools\n", *t.loggerPrefix) } // only safe to be called when nothing else is running t.translator.done(ctx) transformerLogger.Printf("%s TERMINATED | latency: %v\n", *t.loggerPrefix, time.Since(ts)) } func (t *PcapTransformer) Apply(ctx context.Context, packet *gopacket.Packet, serial *uint64) error { select { case <-ctx.Done(): // reject applying transformer if context is already done. return ctx.Err() default: // applying transformer will write 1 translation into N>0 writers. t.wg.Add(int(*t.numWriters)) t.counter.Add(int64(*t.numWriters)) } // It is assumed that packets will be produced faster than translations and writing operations, so: // - process/translate packets concurrently in order to avoid blocking `gopacket` packets channel as much as possible. worker := newPcapTranslatorWorker(t.ifaces, t.iface, t.filters, serial, packet, t.translator, t.connTracking, t.compat) return t.apply(worker) } func (t *PcapTransformer) translatePacketFn(ctx context.Context, worker interface{}) error { select { case <-ctx.Done(): return ctx.Err() default: return t.produceTranslation(ctx, worker.(*pcapTranslatorWorker)) } } func (t *PcapTransformer) writeTranslationFn(ctx context.Context, task interface{}) error { return t.writeTranslation(ctx, task.(*pcapWriteTask)) } func newTranslator( ctx context.Context, verbosity PcapVerbosity, debug bool, iface *PcapIface, ephemerals *PcapEphemeralPorts, format PcapTranslatorFmt, ) (PcapTranslator, error) { if factory, ok := translators.Load(format); ok { return factory.(PcapTranslatorFactory)(ctx, debug, verbosity, iface, ephemerals), nil } return nil, errors.Join(errUnavailableTranslator, fmt.Errorf("[%d/%s] - invalid format: %v", iface.Index, iface.Name, format)) } func rollbackTranslation( _ context.Context, transformer *PcapTransformer, ) { for range *transformer.numWriters { transformer.counter.Add(-1) transformer.wg.Done() } } // if preserving packet capture order is not required, translations may be done concurrently // concurrently translating packets means that translations are not enqueded in packet capture order. // Similarly, sinking translations into files can be safely done concurrently ( in whatever order goroutines are scheduled ) func provideWorkerPools(ctx context.Context, transformer *PcapTransformer, numWriters *uint8) { poolOpts := ants.Options{ Logger: transformerLogger, PreAlloc: false, Nonblocking: false, // see: https://github.com/panjf2000/ants/blob/v2.10.0/worker_loop_queue.go#L74 ExpiryDuration: time.Duration(10) * time.Second, DisablePurge: true, } poolOpts.PanicHandler = func(i interface{}) { rollbackTranslation(ctx, transformer) // if any go routine panics, recover and print the stack transformerLogger.Printf("%s panic: %+v\n%s\n", *transformer.loggerPrefix, i, string(debug.Stack())) } poolOptions := ants.WithOptions(poolOpts) poolSize := 25 * int(*numWriters) translatorPoolFn := func(i interface{}) { select { case <-ctx.Done(): rollbackTranslation(ctx, transformer) return default: if err := transformer.translatePacketFn(ctx, i); err != nil { transformerLogger.Printf("%s translation failed: %+v\n", *transformer.loggerPrefix, err) rollbackTranslation(ctx, transformer) } } } translatorPool, _ := ants.NewPoolWithFunc(poolSize, translatorPoolFn, poolOptions) transformer.translatorPool = translatorPool writerPoolFn := func(i interface{}) { transformer.writeTranslationFn(ctx, i) } // I/O ( writing ) is slow; so there will be more writers than translator routines writerPool, _ := ants.NewMultiPoolWithFunc(int(*numWriters), 25, writerPoolFn, ants.LeastTasks, poolOptions) transformer.writerPool = writerPool } func provideConcurrentQueue(ctx context.Context, connTrack bool, transformer *PcapTransformer, numWriters *uint8) { // if connection tracking is enabled, the whole process is synchronous, // so the following considerations apply: // - should be enabled only in combination with a very specific filter // - should not be used when high traffic rate is expected: // non-concurrent processing is slower, so more memory is required to buffer packets poolSize := 1 if !connTrack { // when `poolSize` is greater than 1: even when written in order, // packets are processed concurrently which makes connection tracking // a very complex process to be done on-the-fly as order of packet translation // is not guaranteed; introducing contention may slow down the whole process. poolSize = 30 * int(*numWriters) } ochOpts := &concurrently.Options{ PoolSize: poolSize, OutChannelBuffer: 100, } transformer.ich = make(chan concurrently.WorkFunction, 100) transformer.och = concurrently.Process(ctx, transformer.ich, ochOpts) } func provideStrategy( ctx context.Context, transformer *PcapTransformer, preserveOrder, connTracking bool, ) { var apply func(*PcapTransformer, *pcapTranslatorWorker) error = nil if preserveOrder || connTracking { // If ordered output is enabled, enqueue translation workers in packet capture order; // this will introduce some level of contention as the translation Q starts to fill (saturation): // if the next packet to arrive finds a full Q, this method will block until slots are available. // The degree of contention is proportial to the Q capacity times translation latency. // Order should only be used for not network intersive workloads. apply = func(t *PcapTransformer, w *pcapTranslatorWorker) error { select { case <-ctx.Done(): rollbackTranslation(ctx, transformer) return ctx.Err() default: t.ich <- w } return nil } } else { // if ordered output is disabled, translate packets concurrently via translator pool. // Order of gorouting execution is not guaranteed, which means // that packets will be consumed/written in non-deterministic order. // `serial` is aviailable to be used for sorting PCAP files. apply = func(t *PcapTransformer, w *pcapTranslatorWorker) error { return t.translatorPool.Invoke(w) } } transformer.apply = func(w *pcapTranslatorWorker) error { select { case <-ctx.Done(): transformerLogger.Printf("%s #:%d | translation aborted", *w.loggerPrefix, *w.serial) // `Apply` commits `transformer` to write the packet translation, // so if the context is done, commitment must be rolled back rollbackTranslation(ctx, transformer) return errors.Join(errUnavailableTranslation, ctx.Err()) default: return apply(transformer, w) } } } func newPcapTranslator( ctx context.Context, debug bool, verbosity PcapVerbosity, iface *PcapIface, ephemerals *PcapEphemeralPorts, ) *pcapTranslator { return &pcapTranslator{ debug: debug, verbosity: verbosity, iface: iface, ephemerals: ephemerals, } } // transformers get instances of `io.Writer` instead of `pcap.PcapWriter` to prevent closing. func newTransformer( ctx context.Context, verbosity PcapVerbosity, iface *PcapIface, ephemerals *PcapEphemeralPorts, filters PcapFilters, writers []io.Writer, format *string, preserveOrder, connTracking bool, debug, compat bool, ) (IPcapTransformer, error) { pcapFmt := pcapTranslatorFmts[*format] translator, err := newTranslator(ctx, verbosity, debug, iface, ephemerals, pcapFmt) if err != nil { return nil, err } loggerPrefix := fmt.Sprintf("[%d/%s] -", iface.Index, iface.Name) numWriters := uint8(len(writers)) // not using `io.MultiWriter` as it writes to all writers sequentially writeQueues := make([]chan *fmt.Stringer, numWriters) writeQueuesDone := make([]chan struct{}, numWriters) for i := range writers { writeQueues[i] = make(chan *fmt.Stringer, 50) writeQueuesDone[i] = make(chan struct{}) } // inverted index from `net.Address` to `PcapInterface` // - will allow to find the correct interface in O(1) when the selected interface is `0/any` var ifaces netIfaceIndex = make(map[string]*PcapIface) if _ifaces, err := net.Interfaces(); err == nil { // O(N*M): we must visit all interfaces to get all available IPs // - N: is the number of interfaces // - M: is the number of IPs per interface for _, _iface := range _ifaces { _addrs, err := _iface.Addrs() if err != nil { continue } pcapIface := &PcapIface{ Index: uint8(_iface.Index), Name: _iface.Name, Addrs: mapset.NewSetWithSize[string](len(_addrs)), } // O(M): M is the number of IPs assigned to this interface for _, _addr := range _addrs { addr := strings.SplitN(_addr.String(), "/", 2)[0] pcapIface.Addrs.Add(addr) ifaces[addr] = pcapIface } } } // same transformer, multiple strategies // via multiple translator implementations transformer := &PcapTransformer{ wg: new(sync.WaitGroup), ctx: ctx, iface: iface, ifaces: ifaces, filters: filters, ephemerals: ephemerals, loggerPrefix: &loggerPrefix, translator: translator, writers: writers, numWriters: &numWriters, writeQueues: writeQueues, writeQueuesDone: writeQueuesDone, preserveOrder: preserveOrder || connTracking, connTracking: connTracking, counter: new(atomic.Int64), debug: debug, compat: compat, verbosity: verbosity, } provideStrategy(ctx, transformer, preserveOrder, connTracking) // `preserveOrder==true` causes writes to be sequential and blocking per `io.Writer`. // `preserveOrder==true` although blocking at writting, does not cause `transformer.Apply` to block. if preserveOrder || connTracking { provideConcurrentQueue(ctx, connTracking, transformer, &numWriters) go transformer.waitForContextDone(ctx) go transformer.produceTranslations(ctx) } else { provideWorkerPools(ctx, transformer, &numWriters) } // spawn consumers for all `io.Writer`s // 1 consumer goroutine per `io.Writer` for i := range writeQueues { index := uint8(i) go transformer.consumeTranslations(ctx, &index) } transformerLogger.Printf("%s CREATED | format:%s | writers:%d\n", loggerPrefix, *format, numWriters) return transformer, nil } func NewOrderedTransformer( ctx context.Context, verbosity PcapVerbosity, iface *PcapIface, ephemerals *PcapEphemeralPorts, filters PcapFilters, writers []io.Writer, format *string, debug, compat bool, ) (IPcapTransformer, error) { return newTransformer(ctx, verbosity, iface, ephemerals, filters, writers, format, true /* preserveOrder */, false /* connTracking */, debug, compat) } func NewConnTrackTransformer( ctx context.Context, verbosity PcapVerbosity, iface *PcapIface, ephemerals *PcapEphemeralPorts, filters PcapFilters, writers []io.Writer, format *string, debug, compat bool, ) (IPcapTransformer, error) { return newTransformer(ctx, verbosity, iface, ephemerals, filters, writers, format, true /* preserveOrder */, true /* connTracking */, debug, compat) } func NewDebugTransformer( ctx context.Context, verbosity PcapVerbosity, iface *PcapIface, ephemerals *PcapEphemeralPorts, filters PcapFilters, writers []io.Writer, format *string, compat bool, ) (IPcapTransformer, error) { return newTransformer(ctx, verbosity, iface, ephemerals, filters, writers, format, false /* preserveOrder */, false /* connTracking */, true /* debug */, compat) } func NewTransformer( ctx context.Context, verbosity PcapVerbosity, iface *PcapIface, ephemerals *PcapEphemeralPorts, filters PcapFilters, writers []io.Writer, format *string, debug, compat bool, ) (IPcapTransformer, error) { return newTransformer(ctx, verbosity, iface, ephemerals, filters, writers, format, false /* preserveOrder */, false /* connTracking */, debug, compat) }