func newPartitionProducer()

in pulsar/producer_partition.go [123:201]


func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int,
	metrics *internal.LeveledMetrics) (
	*partitionProducer, error) {
	var batchingMaxPublishDelay time.Duration
	if options.BatchingMaxPublishDelay != 0 {
		batchingMaxPublishDelay = options.BatchingMaxPublishDelay
	} else {
		batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
	}

	var maxPendingMessages int
	if options.MaxPendingMessages == 0 {
		maxPendingMessages = 1000
	} else {
		maxPendingMessages = options.MaxPendingMessages
	}

	logger := client.log.SubLogger(log.Fields{"topic": topic})

	p := &partitionProducer{
		client:           client,
		topic:            topic,
		log:              logger,
		options:          options,
		producerID:       client.rpcClient.NewProducerID(),
		dataChan:         make(chan *sendRequest, maxPendingMessages),
		cmdChan:          make(chan interface{}, 10),
		connectClosedCh:  make(chan connectionClosed, 10),
		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
		compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
			compression.Level(options.CompressionLevel)),
		publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
		pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
		lastSequenceID:   -1,
		partitionIdx:     int32(partitionIdx),
		metrics:          metrics,
		epoch:            0,
		schemaCache:      newSchemaCache(),
	}
	if p.options.DisableBatching {
		p.batchFlushTicker.Stop()
	}
	p.setProducerState(producerInit)

	if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
		p.schemaInfo = options.Schema.GetSchemaInfo()
	} else {
		p.schemaInfo = nil
	}

	if options.Name != "" {
		p.producerName = options.Name
		p.userProvidedProducerName = true
	} else {
		p.userProvidedProducerName = false
	}
	err := p.grabCnx()
	if err != nil {
		p.batchFlushTicker.Stop()
		logger.WithError(err).Error("Failed to create producer at newPartitionProducer")
		return nil, err
	}

	p.log = p.log.SubLogger(log.Fields{
		"producer_name": p.producerName,
		"producerID":    p.producerID,
	})

	p.log.WithField("cnx", p._getConn().ID()).Info("Created producer")
	p.setProducerState(producerReady)

	if p.options.SendTimeout > 0 {
		go p.failTimeoutMessages()
	}

	go p.runEventsLoop()

	return p, nil
}