in pcap-cli/internal/transformer/transformer.go [659:763]
func newTransformer(
ctx context.Context,
verbosity PcapVerbosity,
iface *PcapIface,
ephemerals *PcapEphemeralPorts,
filters PcapFilters,
writers []io.Writer,
format *string,
preserveOrder,
connTracking bool,
debug, compat bool,
) (IPcapTransformer, error) {
pcapFmt := pcapTranslatorFmts[*format]
translator, err := newTranslator(ctx, verbosity, debug, iface, ephemerals, pcapFmt)
if err != nil {
return nil, err
}
loggerPrefix := fmt.Sprintf("[%d/%s] -", iface.Index, iface.Name)
numWriters := uint8(len(writers))
// not using `io.MultiWriter` as it writes to all writers sequentially
writeQueues := make([]chan *fmt.Stringer, numWriters)
writeQueuesDone := make([]chan struct{}, numWriters)
for i := range writers {
writeQueues[i] = make(chan *fmt.Stringer, 50)
writeQueuesDone[i] = make(chan struct{})
}
// inverted index from `net.Address` to `PcapInterface`
// - will allow to find the correct interface in O(1) when the selected interface is `0/any`
var ifaces netIfaceIndex = make(map[string]*PcapIface)
if _ifaces, err := net.Interfaces(); err == nil {
// O(N*M): we must visit all interfaces to get all available IPs
// - N: is the number of interfaces
// - M: is the number of IPs per interface
for _, _iface := range _ifaces {
_addrs, err := _iface.Addrs()
if err != nil {
continue
}
pcapIface := &PcapIface{
Index: uint8(_iface.Index),
Name: _iface.Name,
Addrs: mapset.NewSetWithSize[string](len(_addrs)),
}
// O(M): M is the number of IPs assigned to this interface
for _, _addr := range _addrs {
addr := strings.SplitN(_addr.String(), "/", 2)[0]
pcapIface.Addrs.Add(addr)
ifaces[addr] = pcapIface
}
}
}
// same transformer, multiple strategies
// via multiple translator implementations
transformer := &PcapTransformer{
wg: new(sync.WaitGroup),
ctx: ctx,
iface: iface,
ifaces: ifaces,
filters: filters,
ephemerals: ephemerals,
loggerPrefix: &loggerPrefix,
translator: translator,
writers: writers,
numWriters: &numWriters,
writeQueues: writeQueues,
writeQueuesDone: writeQueuesDone,
preserveOrder: preserveOrder || connTracking,
connTracking: connTracking,
counter: new(atomic.Int64),
debug: debug,
compat: compat,
verbosity: verbosity,
}
provideStrategy(ctx, transformer, preserveOrder, connTracking)
// `preserveOrder==true` causes writes to be sequential and blocking per `io.Writer`.
// `preserveOrder==true` although blocking at writting, does not cause `transformer.Apply` to block.
if preserveOrder || connTracking {
provideConcurrentQueue(ctx, connTracking, transformer, &numWriters)
go transformer.waitForContextDone(ctx)
go transformer.produceTranslations(ctx)
} else {
provideWorkerPools(ctx, transformer, &numWriters)
}
// spawn consumers for all `io.Writer`s
// 1 consumer goroutine per `io.Writer`
for i := range writeQueues {
index := uint8(i)
go transformer.consumeTranslations(ctx, &index)
}
transformerLogger.Printf("%s CREATED | format:%s | writers:%d\n", loggerPrefix, *format, numWriters)
return transformer, nil
}