in kafka/consumer.go [469:487]
func (c *consumer) lost(_ context.Context, _ *kgo.Client, lost map[string][]int32) {
c.mu.Lock()
defer c.mu.Unlock()
var wg sync.WaitGroup
for topic, partitions := range lost {
for _, partition := range partitions {
tp := topicPartition{topic: topic, partition: partition}
if consumer, ok := c.assignments[tp]; ok {
wg.Add(1)
go func() {
defer wg.Done()
consumer.wait()
}()
}
delete(c.assignments, tp)
}
}
wg.Wait()
}