func provideWorkerPools()

in pcap-cli/internal/transformer/transformer.go [528:570]


func provideWorkerPools(ctx context.Context, transformer *PcapTransformer, numWriters *uint8) {
	poolOpts := ants.Options{
		Logger:      transformerLogger,
		PreAlloc:    false,
		Nonblocking: false,
		// see: https://github.com/panjf2000/ants/blob/v2.10.0/worker_loop_queue.go#L74
		ExpiryDuration: time.Duration(10) * time.Second,
		DisablePurge:   true,
	}

	poolOpts.PanicHandler = func(i interface{}) {
		rollbackTranslation(ctx, transformer)
		// if any go routine panics, recover and print the stack
		transformerLogger.Printf("%s panic: %+v\n%s\n",
			*transformer.loggerPrefix, i, string(debug.Stack()))
	}

	poolOptions := ants.WithOptions(poolOpts)

	poolSize := 25 * int(*numWriters)

	translatorPoolFn := func(i interface{}) {
		select {
		case <-ctx.Done():
			rollbackTranslation(ctx, transformer)
			return
		default:
			if err := transformer.translatePacketFn(ctx, i); err != nil {
				transformerLogger.Printf("%s translation failed: %+v\n", *transformer.loggerPrefix, err)
				rollbackTranslation(ctx, transformer)
			}
		}
	}
	translatorPool, _ := ants.NewPoolWithFunc(poolSize, translatorPoolFn, poolOptions)
	transformer.translatorPool = translatorPool

	writerPoolFn := func(i interface{}) {
		transformer.writeTranslationFn(ctx, i)
	}
	// I/O ( writing ) is slow; so there will be more writers than translator routines
	writerPool, _ := ants.NewMultiPoolWithFunc(int(*numWriters), 25, writerPoolFn, ants.LeastTasks, poolOptions)
	transformer.writerPool = writerPool
}