func()

in pipe/kafka.go [226:292]


func (p *KafkaPipe) redistributeConsumers(c *topicConsumer) {
	if len(c.consumers) == 0 {
		return
	}

	/*Stop currently running consumers and wait till they terminate*/
	if c.cancel != nil { // can be nil when called for the first time
		c.cancel()
	}
	c.wg.Wait()

	var nparts = len(c.partitions)

	log.Debugf("Distributing %v partition(s) onto %v consumer(s)", nparts, len(c.consumers))

	ctx, cancel := context.WithCancel(context.Background())
	c.cancel = cancel

	j := 0
	partsPerConsumer := nparts / (len(c.consumers) - j)

	c.wg.Add(nparts)
	for i := 0; i < nparts; i++ {
		c.partitions[i].childConsumer = c.consumers[j]
		/* Partition 'i' messages goes to consumer 'j' */
		go func(ctx context.Context, i int, outCh chan *sarama.ConsumerMessage) {
			defer c.wg.Done()

			if c.partitions[i].nextMsg != nil && !p.pushPartitionMsg(ctx, outCh, &c.partitions[i]) {
				return
			}

			for {
				select {
				// FIXME: Config kafka and handle errors. By default errors logged only and not returned by the channel
				/*
					case msg := <-c.partitions[i].consumer.Errors():
						select {
						case c.consumers[j] <- msg:
						case <-c.ctx.Done():
							return
						}
				*/
				case c.partitions[i].nextMsg = <-c.partitions[i].consumer.Messages():
					//Copy data to our buffer. Key field is not used, so not copied
					b := make([]byte, len(c.partitions[i].nextMsg.Value))
					copy(b, c.partitions[i].nextMsg.Value)
					c.partitions[i].nextMsg.Value = b
					if !p.pushPartitionMsg(ctx, outCh, &c.partitions[i]) {
						return
					}
				case <-ctx.Done():
					return
				}
			}
		}(ctx, i, c.consumers[j])
		log.Debugf("Started consumer, partition=%d, push to channel=%d", i, j)

		/*Try our best to equally redistribute work */
		if (nparts-i-1)%partsPerConsumer == 0 {
			j++
			if len(c.consumers) != j {
				partsPerConsumer = (nparts - i - 1) / (len(c.consumers) - j)
			}
		}
	}
}