in pulsar/producer_partition.go [123:201]
func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int,
metrics *internal.LeveledMetrics) (
*partitionProducer, error) {
var batchingMaxPublishDelay time.Duration
if options.BatchingMaxPublishDelay != 0 {
batchingMaxPublishDelay = options.BatchingMaxPublishDelay
} else {
batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}
var maxPendingMessages int
if options.MaxPendingMessages == 0 {
maxPendingMessages = 1000
} else {
maxPendingMessages = options.MaxPendingMessages
}
logger := client.log.SubLogger(log.Fields{"topic": topic})
p := &partitionProducer{
client: client,
topic: topic,
log: logger,
options: options,
producerID: client.rpcClient.NewProducerID(),
dataChan: make(chan *sendRequest, maxPendingMessages),
cmdChan: make(chan interface{}, 10),
connectClosedCh: make(chan connectionClosed, 10),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
compression.Level(options.CompressionLevel)),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
lastSequenceID: -1,
partitionIdx: int32(partitionIdx),
metrics: metrics,
epoch: 0,
schemaCache: newSchemaCache(),
}
if p.options.DisableBatching {
p.batchFlushTicker.Stop()
}
p.setProducerState(producerInit)
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
p.schemaInfo = options.Schema.GetSchemaInfo()
} else {
p.schemaInfo = nil
}
if options.Name != "" {
p.producerName = options.Name
p.userProvidedProducerName = true
} else {
p.userProvidedProducerName = false
}
err := p.grabCnx()
if err != nil {
p.batchFlushTicker.Stop()
logger.WithError(err).Error("Failed to create producer at newPartitionProducer")
return nil, err
}
p.log = p.log.SubLogger(log.Fields{
"producer_name": p.producerName,
"producerID": p.producerID,
})
p.log.WithField("cnx", p._getConn().ID()).Info("Created producer")
p.setProducerState(producerReady)
if p.options.SendTimeout > 0 {
go p.failTimeoutMessages()
}
go p.runEventsLoop()
return p, nil
}