in pcap-cli/internal/transformer/transformer.go [350:389]
func (t *PcapTransformer) consumeTranslations(ctx context.Context, index *uint8) error {
// `consumeTranslations` runs in 1 goroutine per writer,
// so it needs to be context aware to be able to gracefully stop, thus preventing a leak.
for {
select {
case <-ctx.Done():
// drop translations if context is already done
droppedTranslations := uint64(0)
// some translations may have been on-going when context was cancelled:
// - fully consume the `writerQueue` and rollback the write commitment,
// - block until `close` on the `writerQueue` is called by `WaitDone`
for translation := range t.writeQueues[*index] {
// best-effort: dump all non-written translations into `STDERR`
if *index == 0 {
fmt.Fprintln(os.Stderr, (*translation).String())
}
droppedTranslations += 1
t.counter.Add(-1)
t.wg.Done()
}
transformerLogger.Printf("%s translations consumer DONE | writer:%d | dropped:%d\n", *t.loggerPrefix, *index+1, droppedTranslations)
close(t.writeQueuesDone[*index])
return ctx.Err()
case translation := <-t.writeQueues[*index]:
task := &pcapWriteTask{
ctx: ctx,
writer: index,
translation: translation,
}
if t.preserveOrder || t.connTracking {
// this is mostly blocking
t.writeTranslation(ctx, task)
} else {
// this is mostly non-blocking
t.writerPool.Invoke(task)
}
}
}
}