func newTransformer()

in pcap-cli/internal/transformer/transformer.go [659:763]


func newTransformer(
	ctx context.Context,
	verbosity PcapVerbosity,
	iface *PcapIface,
	ephemerals *PcapEphemeralPorts,
	filters PcapFilters,
	writers []io.Writer,
	format *string,
	preserveOrder,
	connTracking bool,
	debug, compat bool,
) (IPcapTransformer, error) {
	pcapFmt := pcapTranslatorFmts[*format]
	translator, err := newTranslator(ctx, verbosity, debug, iface, ephemerals, pcapFmt)
	if err != nil {
		return nil, err
	}

	loggerPrefix := fmt.Sprintf("[%d/%s] -", iface.Index, iface.Name)

	numWriters := uint8(len(writers))
	// not using `io.MultiWriter` as it writes to all writers sequentially
	writeQueues := make([]chan *fmt.Stringer, numWriters)
	writeQueuesDone := make([]chan struct{}, numWriters)
	for i := range writers {
		writeQueues[i] = make(chan *fmt.Stringer, 50)
		writeQueuesDone[i] = make(chan struct{})
	}

	// inverted index from `net.Address` to `PcapInterface`
	//   - will allow to find the correct interface in O(1) when the selected interface is `0/any`
	var ifaces netIfaceIndex = make(map[string]*PcapIface)
	if _ifaces, err := net.Interfaces(); err == nil {
		// O(N*M): we must visit all interfaces to get all available IPs
		//   - N: is the number of interfaces
		//   - M: is the number of IPs per interface
		for _, _iface := range _ifaces {

			_addrs, err := _iface.Addrs()
			if err != nil {
				continue
			}

			pcapIface := &PcapIface{
				Index: uint8(_iface.Index),
				Name:  _iface.Name,
				Addrs: mapset.NewSetWithSize[string](len(_addrs)),
			}

			// O(M): M is the number of IPs assigned to this interface
			for _, _addr := range _addrs {
				addr := strings.SplitN(_addr.String(), "/", 2)[0]
				pcapIface.Addrs.Add(addr)
				ifaces[addr] = pcapIface
			}

		}
	}

	// same transformer, multiple strategies
	// via multiple translator implementations
	transformer := &PcapTransformer{
		wg:              new(sync.WaitGroup),
		ctx:             ctx,
		iface:           iface,
		ifaces:          ifaces,
		filters:         filters,
		ephemerals:      ephemerals,
		loggerPrefix:    &loggerPrefix,
		translator:      translator,
		writers:         writers,
		numWriters:      &numWriters,
		writeQueues:     writeQueues,
		writeQueuesDone: writeQueuesDone,
		preserveOrder:   preserveOrder || connTracking,
		connTracking:    connTracking,
		counter:         new(atomic.Int64),
		debug:           debug,
		compat:          compat,
		verbosity:       verbosity,
	}

	provideStrategy(ctx, transformer, preserveOrder, connTracking)

	// `preserveOrder==true` causes writes to be sequential and blocking per `io.Writer`.
	// `preserveOrder==true` although blocking at writting, does not cause `transformer.Apply` to block.
	if preserveOrder || connTracking {
		provideConcurrentQueue(ctx, connTracking, transformer, &numWriters)
		go transformer.waitForContextDone(ctx)
		go transformer.produceTranslations(ctx)
	} else {
		provideWorkerPools(ctx, transformer, &numWriters)
	}

	// spawn consumers for all `io.Writer`s
	// 1 consumer goroutine per `io.Writer`
	for i := range writeQueues {
		index := uint8(i)
		go transformer.consumeTranslations(ctx, &index)
	}

	transformerLogger.Printf("%s CREATED | format:%s | writers:%d\n", loggerPrefix, *format, numWriters)

	return transformer, nil
}