func newProducer()

in pulsar/producer_impl.go [76:161]


func newProducer(client *client, options *ProducerOptions) (*producer, error) {
	if options.Topic == "" {
		return nil, newError(InvalidTopicName, "Topic name is required for producer")
	}

	if options.SendTimeout == 0 {
		options.SendTimeout = defaultSendTimeout
	}
	if options.BatchingMaxMessages == 0 {
		options.BatchingMaxMessages = defaultMaxMessagesPerBatch
	}
	if options.BatchingMaxSize == 0 {
		options.BatchingMaxSize = defaultMaxBatchSize
	}
	if options.BatchingMaxPublishDelay <= 0 {
		options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay
	}
	if options.PartitionsAutoDiscoveryInterval <= 0 {
		options.PartitionsAutoDiscoveryInterval = defaultPartitionsAutoDiscoveryInterval
	}

	if !options.DisableBatching && options.EnableChunking {
		return nil, fmt.Errorf("batching and chunking can not be enabled together")
	}

	p := &producer{
		options: options,
		topic:   options.Topic,
		client:  client,
		log:     client.log.SubLogger(log.Fields{"topic": options.Topic}),
		metrics: client.metrics.GetLeveledMetrics(options.Topic),
	}

	if options.Interceptors == nil {
		options.Interceptors = defaultProducerInterceptors
	}

	if options.MessageRouter == nil {
		internalRouter := NewDefaultRouter(
			getHashingFunction(options.HashingScheme),
			options.BatchingMaxMessages,
			options.BatchingMaxSize,
			options.BatchingMaxPublishDelay,
			options.DisableBatching)
		p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
			return internalRouter(message, metadata.NumPartitions())
		}
	} else {
		p.messageRouter = options.MessageRouter
	}

	if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
		if options.Schema.GetSchemaInfo().Type == NONE {
			options.Schema = NewBytesSchema(nil)
		}
	}

	encryption := options.Encryption
	// add default message crypto if not provided
	if encryption != nil && len(encryption.Keys) > 0 {
		if encryption.KeyReader == nil {
			return nil, fmt.Errorf("encryption is enabled, KeyReader can not be nil")
		}

		if encryption.MessageCrypto == nil {
			logCtx := fmt.Sprintf("[%v] [%v]", p.topic, p.options.Name)
			messageCrypto, err := crypto.NewDefaultMessageCrypto(logCtx,
				true,
				client.log.SubLogger(log.Fields{"topic": p.topic}))
			if err != nil {
				return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %w", err)
			}
			p.options.Encryption.MessageCrypto = messageCrypto
		}
	}

	err := p.internalCreatePartitionsProducers()
	if err != nil {
		return nil, err
	}

	p.stopDiscovery = p.runBackgroundPartitionDiscovery(options.PartitionsAutoDiscoveryInterval)

	p.metrics.ProducersOpened.Inc()
	return p, nil
}