in pcap-cli/internal/transformer/transformer.go [528:570]
func provideWorkerPools(ctx context.Context, transformer *PcapTransformer, numWriters *uint8) {
poolOpts := ants.Options{
Logger: transformerLogger,
PreAlloc: false,
Nonblocking: false,
// see: https://github.com/panjf2000/ants/blob/v2.10.0/worker_loop_queue.go#L74
ExpiryDuration: time.Duration(10) * time.Second,
DisablePurge: true,
}
poolOpts.PanicHandler = func(i interface{}) {
rollbackTranslation(ctx, transformer)
// if any go routine panics, recover and print the stack
transformerLogger.Printf("%s panic: %+v\n%s\n",
*transformer.loggerPrefix, i, string(debug.Stack()))
}
poolOptions := ants.WithOptions(poolOpts)
poolSize := 25 * int(*numWriters)
translatorPoolFn := func(i interface{}) {
select {
case <-ctx.Done():
rollbackTranslation(ctx, transformer)
return
default:
if err := transformer.translatePacketFn(ctx, i); err != nil {
transformerLogger.Printf("%s translation failed: %+v\n", *transformer.loggerPrefix, err)
rollbackTranslation(ctx, transformer)
}
}
}
translatorPool, _ := ants.NewPoolWithFunc(poolSize, translatorPoolFn, poolOptions)
transformer.translatorPool = translatorPool
writerPoolFn := func(i interface{}) {
transformer.writeTranslationFn(ctx, i)
}
// I/O ( writing ) is slow; so there will be more writers than translator routines
writerPool, _ := ants.NewMultiPoolWithFunc(int(*numWriters), 25, writerPoolFn, ants.LeastTasks, poolOptions)
transformer.writerPool = writerPool
}