func()

in pcap-cli/internal/transformer/transformer.go [350:389]


func (t *PcapTransformer) consumeTranslations(ctx context.Context, index *uint8) error {
	// `consumeTranslations` runs in 1 goroutine per writer,
	// so it needs to be context aware to be able to gracefully stop, thus preventing a leak.
	for {
		select {
		case <-ctx.Done():
			// drop translations if context is already done
			droppedTranslations := uint64(0)
			// some translations may have been on-going when context was cancelled:
			//   - fully consume the `writerQueue` and rollback the write commitment,
			//   - block until `close` on the `writerQueue` is called by `WaitDone`
			for translation := range t.writeQueues[*index] {
				// best-effort: dump all non-written translations into `STDERR`
				if *index == 0 {
					fmt.Fprintln(os.Stderr, (*translation).String())
				}
				droppedTranslations += 1
				t.counter.Add(-1)
				t.wg.Done()
			}
			transformerLogger.Printf("%s translations consumer DONE | writer:%d | dropped:%d\n", *t.loggerPrefix, *index+1, droppedTranslations)
			close(t.writeQueuesDone[*index])
			return ctx.Err()

		case translation := <-t.writeQueues[*index]:
			task := &pcapWriteTask{
				ctx:         ctx,
				writer:      index,
				translation: translation,
			}
			if t.preserveOrder || t.connTracking {
				// this is mostly blocking
				t.writeTranslation(ctx, task)
			} else {
				// this is mostly non-blocking
				t.writerPool.Invoke(task)
			}
		}
	}
}