in pcap-cli/internal/transformer/transformer.go [398:467]
func (t *PcapTransformer) WaitDone(ctx context.Context, timeout *time.Duration) {
ts := time.Now()
timer := time.NewTimer(*timeout)
writeDoneChan := make(chan struct{})
go func(t *PcapTransformer, writeDone chan struct{}) {
if !t.preserveOrder && !t.connTracking {
transformerLogger.Printf("%s gracefully terminating | tp: %d/%d | wp: %d/%d | pending:%d | deadline: %v\n",
*t.loggerPrefix, t.translatorPool.Running(), t.translatorPool.Waiting(),
t.writerPool.Running(), t.writerPool.Waiting(), t.counter.Load(), timeout)
} else {
transformerLogger.Printf("%s gracefully terminating | pending: %d | deadline: %v\n", *t.loggerPrefix, t.counter.Load(), timeout)
}
t.wg.Wait() // wait for all translations to be written
close(writeDone)
}(t, writeDoneChan)
select {
case <-timer.C:
if !t.preserveOrder && !t.connTracking {
transformerLogger.Printf("%s timed out waiting for graceful termination | tp: %d/%d | wp: %d/%d | pending:%d\n",
*t.loggerPrefix, t.translatorPool.Running(), t.translatorPool.Waiting(), t.writerPool.Running(), t.writerPool.Waiting(), t.counter.Load())
} else {
transformerLogger.Printf("%s timed out waiting for graceful termination | pending:%d\n", *t.loggerPrefix, t.counter.Load())
}
for _, writeQueue := range t.writeQueues {
close(writeQueue) // close writer channels
}
t.translator.done(ctx)
return
case <-writeDoneChan:
if !timer.Stop() {
<-timer.C
}
transformerLogger.Printf("%s STOPPED | tp: %d/%d | wp: %d/%d | pending:%d | latency: %v\n",
*t.loggerPrefix, t.translatorPool.Running(), t.translatorPool.Waiting(),
t.writerPool.Running(), t.writerPool.Waiting(), t.counter.Load(), time.Since(ts))
}
for i, writeQueue := range t.writeQueues {
// unblock `consumeTranslations` goroutines
close(writeQueue) // close writer channels
<-t.writeQueuesDone[i]
}
_timeout := *timeout - time.Since(ts)
// if order is not enforced: there are 2 worker pools to be stopped
if _timeout > 0 && !t.preserveOrder && !t.connTracking {
transformerLogger.Printf("%s releasing worker pools | deadline: %v\n", *t.loggerPrefix, _timeout)
var poolReleaserWG sync.WaitGroup
poolReleaserWG.Add(2)
go func(t *PcapTransformer, wg *sync.WaitGroup, deadline *time.Duration) {
t.translatorPool.ReleaseTimeout(*deadline)
wg.Done()
}(t, &poolReleaserWG, &_timeout)
go func(t *PcapTransformer, wg *sync.WaitGroup, deadline *time.Duration) {
t.writerPool.ReleaseTimeout(*deadline)
wg.Done()
}(t, &poolReleaserWG, &_timeout)
poolReleaserWG.Wait()
transformerLogger.Printf("%s released worker pools\n", *t.loggerPrefix)
}
// only safe to be called when nothing else is running
t.translator.done(ctx)
transformerLogger.Printf("%s TERMINATED | latency: %v\n", *t.loggerPrefix, time.Since(ts))
}