in pulsar/producer_impl.go [189:283]
func (p *producer) internalCreatePartitionsProducers() error {
partitions, err := p.client.TopicPartitions(p.topic)
if err != nil {
return err
}
oldNumPartitions := 0
newNumPartitions := len(partitions)
p.Lock()
defer p.Unlock()
oldProducers := p.producers
oldNumPartitions = len(oldProducers)
if oldProducers != nil {
if oldNumPartitions == newNumPartitions {
p.log.Debug("Number of partitions in topic has not changed")
return nil
}
p.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}
p.producers = make([]Producer, newNumPartitions)
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new producers, otherwise the array will be out of bounds.
if oldProducers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
p.producers[i] = oldProducers[i]
}
}
type ProducerError struct {
partition int
prod Producer
err error
}
startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
if partitionsToAdd < 0 {
partitionsToAdd = newNumPartitions
startPartition = 0
}
c := make(chan ProducerError, partitionsToAdd)
for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partition := partitions[partitionIdx]
go func(partitionIdx int, partition string) {
prod, e := newPartitionProducer(p.client, partition, p.options, partitionIdx, p.metrics)
c <- ProducerError{
partition: partitionIdx,
prod: prod,
err: e,
}
}(partitionIdx, partition)
}
for i := 0; i < partitionsToAdd; i++ {
pe, ok := <-c
if ok {
if pe.err != nil {
err = pe.err
} else {
p.producers[pe.partition] = pe.prod
}
}
}
if err != nil {
// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
for _, producer := range p.producers {
if producer != nil {
producer.Close()
}
}
return err
}
if newNumPartitions < oldNumPartitions {
p.metrics.ProducersPartitions.Set(float64(newNumPartitions))
} else {
p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
}
atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
return nil
}