func()

in pipe/kafka.go [431:479]


func (p *KafkaPipe) closeConsumer(kc *kafkaConsumer, graceful bool) error {
	p.lock.Lock()
	defer p.lock.Unlock()

	kc.log.Debugf("Closing consumer. graceful %v", graceful)

	if graceful {
		if err := kc.commitConsumerPartitionOffsets(); err != nil {
			return err
		}
	}

	t := p.consumers[kc.topic]
	c := t.consumers

	if len(c) > 1 {
		//FIXME: Avoid linear search below
		var i int
		for i = 0; i < len(c); i++ {
			if c[i] == kc.ch {
				break
			}
		}
		if i >= len(c) {
			return fmt.Errorf("consumer doesn't belong to the pipe")
		}

		/*Remove consumer by swapping with last element*/
		c[i] = c[len(c)-1]
		t.consumers = c[:len(c)-1]

		p.redistributeConsumers(p.consumers[kc.topic])
	} else {
		t.cancel()
		t.wg.Wait()

		for _, v := range t.partitions {
			err := v.consumer.Close()
			if log.EL(kc.log, err) {
				return err
			}
		}

		delete(p.consumers, kc.topic)

		kc.log.Debugf("Last consumer closed. Topic deleted")
	}
	return nil
}