func()

in kafka/consumer.go [469:487]


func (c *consumer) lost(_ context.Context, _ *kgo.Client, lost map[string][]int32) {
	c.mu.Lock()
	defer c.mu.Unlock()
	var wg sync.WaitGroup
	for topic, partitions := range lost {
		for _, partition := range partitions {
			tp := topicPartition{topic: topic, partition: partition}
			if consumer, ok := c.assignments[tp]; ok {
				wg.Add(1)
				go func() {
					defer wg.Done()
					consumer.wait()
				}()
			}
			delete(c.assignments, tp)
		}
	}
	wg.Wait()
}