in kafka/consumer.go [494:507]
func (c *consumer) close() {
c.mu.Lock()
defer c.mu.Unlock()
var wg sync.WaitGroup
for tp, consumer := range c.assignments {
delete(c.assignments, tp)
wg.Add(1)
go func(c *pc) {
defer wg.Done()
c.wait()
}(consumer)
}
wg.Wait()
}