in pipe/kafka.go [431:479]
func (p *KafkaPipe) closeConsumer(kc *kafkaConsumer, graceful bool) error {
p.lock.Lock()
defer p.lock.Unlock()
kc.log.Debugf("Closing consumer. graceful %v", graceful)
if graceful {
if err := kc.commitConsumerPartitionOffsets(); err != nil {
return err
}
}
t := p.consumers[kc.topic]
c := t.consumers
if len(c) > 1 {
//FIXME: Avoid linear search below
var i int
for i = 0; i < len(c); i++ {
if c[i] == kc.ch {
break
}
}
if i >= len(c) {
return fmt.Errorf("consumer doesn't belong to the pipe")
}
/*Remove consumer by swapping with last element*/
c[i] = c[len(c)-1]
t.consumers = c[:len(c)-1]
p.redistributeConsumers(p.consumers[kc.topic])
} else {
t.cancel()
t.wg.Wait()
for _, v := range t.partitions {
err := v.consumer.Close()
if log.EL(kc.log, err) {
return err
}
}
delete(p.consumers, kc.topic)
kc.log.Debugf("Last consumer closed. Topic deleted")
}
return nil
}