in pcap-cli/internal/transformer/transformer.go [596:641]
func provideStrategy(
ctx context.Context,
transformer *PcapTransformer,
preserveOrder, connTracking bool,
) {
var apply func(*PcapTransformer, *pcapTranslatorWorker) error = nil
if preserveOrder || connTracking {
// If ordered output is enabled, enqueue translation workers in packet capture order;
// this will introduce some level of contention as the translation Q starts to fill (saturation):
// if the next packet to arrive finds a full Q, this method will block until slots are available.
// The degree of contention is proportial to the Q capacity times translation latency.
// Order should only be used for not network intersive workloads.
apply = func(t *PcapTransformer, w *pcapTranslatorWorker) error {
select {
case <-ctx.Done():
rollbackTranslation(ctx, transformer)
return ctx.Err()
default:
t.ich <- w
}
return nil
}
} else {
// if ordered output is disabled, translate packets concurrently via translator pool.
// Order of gorouting execution is not guaranteed, which means
// that packets will be consumed/written in non-deterministic order.
// `serial` is aviailable to be used for sorting PCAP files.
apply = func(t *PcapTransformer, w *pcapTranslatorWorker) error {
return t.translatorPool.Invoke(w)
}
}
transformer.apply = func(w *pcapTranslatorWorker) error {
select {
case <-ctx.Done():
transformerLogger.Printf("%s #:%d | translation aborted", *w.loggerPrefix, *w.serial)
// `Apply` commits `transformer` to write the packet translation,
// so if the context is done, commitment must be rolled back
rollbackTranslation(ctx, transformer)
return errors.Join(errUnavailableTranslation, ctx.Err())
default:
return apply(transformer, w)
}
}
}