func()

in pulsar/producer_impl.go [189:283]


func (p *producer) internalCreatePartitionsProducers() error {
	partitions, err := p.client.TopicPartitions(p.topic)
	if err != nil {
		return err
	}

	oldNumPartitions := 0
	newNumPartitions := len(partitions)

	p.Lock()
	defer p.Unlock()

	oldProducers := p.producers
	oldNumPartitions = len(oldProducers)

	if oldProducers != nil {
		if oldNumPartitions == newNumPartitions {
			p.log.Debug("Number of partitions in topic has not changed")
			return nil
		}

		p.log.WithField("old_partitions", oldNumPartitions).
			WithField("new_partitions", newNumPartitions).
			Info("Changed number of partitions in topic")

	}

	p.producers = make([]Producer, newNumPartitions)

	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
	// we need to rebuild the cache of new producers, otherwise the array will be out of bounds.
	if oldProducers != nil && oldNumPartitions < newNumPartitions {
		// Copy over the existing consumer instances
		for i := 0; i < oldNumPartitions; i++ {
			p.producers[i] = oldProducers[i]
		}
	}

	type ProducerError struct {
		partition int
		prod      Producer
		err       error
	}

	startPartition := oldNumPartitions
	partitionsToAdd := newNumPartitions - oldNumPartitions
	if partitionsToAdd < 0 {
		partitionsToAdd = newNumPartitions
		startPartition = 0
	}
	c := make(chan ProducerError, partitionsToAdd)

	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
		partition := partitions[partitionIdx]

		go func(partitionIdx int, partition string) {
			prod, e := newPartitionProducer(p.client, partition, p.options, partitionIdx, p.metrics)
			c <- ProducerError{
				partition: partitionIdx,
				prod:      prod,
				err:       e,
			}
		}(partitionIdx, partition)
	}

	for i := 0; i < partitionsToAdd; i++ {
		pe, ok := <-c
		if ok {
			if pe.err != nil {
				err = pe.err
			} else {
				p.producers[pe.partition] = pe.prod
			}
		}
	}

	if err != nil {
		// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
		for _, producer := range p.producers {
			if producer != nil {
				producer.Close()
			}
		}
		return err
	}

	if newNumPartitions < oldNumPartitions {
		p.metrics.ProducersPartitions.Set(float64(newNumPartitions))
	} else {
		p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
	}
	atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
	atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
	return nil
}