func()

in pcap-cli/internal/transformer/transformer.go [398:467]


func (t *PcapTransformer) WaitDone(ctx context.Context, timeout *time.Duration) {
	ts := time.Now()
	timer := time.NewTimer(*timeout)

	writeDoneChan := make(chan struct{})

	go func(t *PcapTransformer, writeDone chan struct{}) {
		if !t.preserveOrder && !t.connTracking {
			transformerLogger.Printf("%s gracefully terminating | tp: %d/%d | wp: %d/%d | pending:%d | deadline: %v\n",
				*t.loggerPrefix, t.translatorPool.Running(), t.translatorPool.Waiting(),
				t.writerPool.Running(), t.writerPool.Waiting(), t.counter.Load(), timeout)
		} else {
			transformerLogger.Printf("%s gracefully terminating | pending: %d | deadline: %v\n", *t.loggerPrefix, t.counter.Load(), timeout)
		}
		t.wg.Wait() // wait for all translations to be written
		close(writeDone)
	}(t, writeDoneChan)

	select {
	case <-timer.C:
		if !t.preserveOrder && !t.connTracking {
			transformerLogger.Printf("%s timed out waiting for graceful termination | tp: %d/%d | wp: %d/%d | pending:%d\n",
				*t.loggerPrefix, t.translatorPool.Running(), t.translatorPool.Waiting(), t.writerPool.Running(), t.writerPool.Waiting(), t.counter.Load())
		} else {
			transformerLogger.Printf("%s timed out waiting for graceful termination | pending:%d\n", *t.loggerPrefix, t.counter.Load())
		}
		for _, writeQueue := range t.writeQueues {
			close(writeQueue) // close writer channels
		}
		t.translator.done(ctx)
		return

	case <-writeDoneChan:
		if !timer.Stop() {
			<-timer.C
		}
		transformerLogger.Printf("%s STOPPED | tp: %d/%d | wp: %d/%d | pending:%d | latency: %v\n",
			*t.loggerPrefix, t.translatorPool.Running(), t.translatorPool.Waiting(),
			t.writerPool.Running(), t.writerPool.Waiting(), t.counter.Load(), time.Since(ts))
	}

	for i, writeQueue := range t.writeQueues {
		// unblock `consumeTranslations` goroutines
		close(writeQueue) // close writer channels
		<-t.writeQueuesDone[i]
	}

	_timeout := *timeout - time.Since(ts)
	// if order is not enforced: there are 2 worker pools to be stopped
	if _timeout > 0 && !t.preserveOrder && !t.connTracking {
		transformerLogger.Printf("%s releasing worker pools | deadline: %v\n", *t.loggerPrefix, _timeout)
		var poolReleaserWG sync.WaitGroup
		poolReleaserWG.Add(2)
		go func(t *PcapTransformer, wg *sync.WaitGroup, deadline *time.Duration) {
			t.translatorPool.ReleaseTimeout(*deadline)
			wg.Done()
		}(t, &poolReleaserWG, &_timeout)
		go func(t *PcapTransformer, wg *sync.WaitGroup, deadline *time.Duration) {
			t.writerPool.ReleaseTimeout(*deadline)
			wg.Done()
		}(t, &poolReleaserWG, &_timeout)
		poolReleaserWG.Wait()
		transformerLogger.Printf("%s released worker pools\n", *t.loggerPrefix)
	}

	// only safe to be called when nothing else is running
	t.translator.done(ctx)

	transformerLogger.Printf("%s TERMINATED | latency: %v\n", *t.loggerPrefix, time.Since(ts))
}