in pipe/kafka.go [226:292]
func (p *KafkaPipe) redistributeConsumers(c *topicConsumer) {
if len(c.consumers) == 0 {
return
}
/*Stop currently running consumers and wait till they terminate*/
if c.cancel != nil { // can be nil when called for the first time
c.cancel()
}
c.wg.Wait()
var nparts = len(c.partitions)
log.Debugf("Distributing %v partition(s) onto %v consumer(s)", nparts, len(c.consumers))
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
j := 0
partsPerConsumer := nparts / (len(c.consumers) - j)
c.wg.Add(nparts)
for i := 0; i < nparts; i++ {
c.partitions[i].childConsumer = c.consumers[j]
/* Partition 'i' messages goes to consumer 'j' */
go func(ctx context.Context, i int, outCh chan *sarama.ConsumerMessage) {
defer c.wg.Done()
if c.partitions[i].nextMsg != nil && !p.pushPartitionMsg(ctx, outCh, &c.partitions[i]) {
return
}
for {
select {
// FIXME: Config kafka and handle errors. By default errors logged only and not returned by the channel
/*
case msg := <-c.partitions[i].consumer.Errors():
select {
case c.consumers[j] <- msg:
case <-c.ctx.Done():
return
}
*/
case c.partitions[i].nextMsg = <-c.partitions[i].consumer.Messages():
//Copy data to our buffer. Key field is not used, so not copied
b := make([]byte, len(c.partitions[i].nextMsg.Value))
copy(b, c.partitions[i].nextMsg.Value)
c.partitions[i].nextMsg.Value = b
if !p.pushPartitionMsg(ctx, outCh, &c.partitions[i]) {
return
}
case <-ctx.Done():
return
}
}
}(ctx, i, c.consumers[j])
log.Debugf("Started consumer, partition=%d, push to channel=%d", i, j)
/*Try our best to equally redistribute work */
if (nparts-i-1)%partsPerConsumer == 0 {
j++
if len(c.consumers) != j {
partsPerConsumer = (nparts - i - 1) / (len(c.consumers) - j)
}
}
}
}