in pulsar/producer_impl.go [76:161]
func newProducer(client *client, options *ProducerOptions) (*producer, error) {
if options.Topic == "" {
return nil, newError(InvalidTopicName, "Topic name is required for producer")
}
if options.SendTimeout == 0 {
options.SendTimeout = defaultSendTimeout
}
if options.BatchingMaxMessages == 0 {
options.BatchingMaxMessages = defaultMaxMessagesPerBatch
}
if options.BatchingMaxSize == 0 {
options.BatchingMaxSize = defaultMaxBatchSize
}
if options.BatchingMaxPublishDelay <= 0 {
options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}
if options.PartitionsAutoDiscoveryInterval <= 0 {
options.PartitionsAutoDiscoveryInterval = defaultPartitionsAutoDiscoveryInterval
}
if !options.DisableBatching && options.EnableChunking {
return nil, fmt.Errorf("batching and chunking can not be enabled together")
}
p := &producer{
options: options,
topic: options.Topic,
client: client,
log: client.log.SubLogger(log.Fields{"topic": options.Topic}),
metrics: client.metrics.GetLeveledMetrics(options.Topic),
}
if options.Interceptors == nil {
options.Interceptors = defaultProducerInterceptors
}
if options.MessageRouter == nil {
internalRouter := NewDefaultRouter(
getHashingFunction(options.HashingScheme),
options.BatchingMaxMessages,
options.BatchingMaxSize,
options.BatchingMaxPublishDelay,
options.DisableBatching)
p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
return internalRouter(message, metadata.NumPartitions())
}
} else {
p.messageRouter = options.MessageRouter
}
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
if options.Schema.GetSchemaInfo().Type == NONE {
options.Schema = NewBytesSchema(nil)
}
}
encryption := options.Encryption
// add default message crypto if not provided
if encryption != nil && len(encryption.Keys) > 0 {
if encryption.KeyReader == nil {
return nil, fmt.Errorf("encryption is enabled, KeyReader can not be nil")
}
if encryption.MessageCrypto == nil {
logCtx := fmt.Sprintf("[%v] [%v]", p.topic, p.options.Name)
messageCrypto, err := crypto.NewDefaultMessageCrypto(logCtx,
true,
client.log.SubLogger(log.Fields{"topic": p.topic}))
if err != nil {
return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %w", err)
}
p.options.Encryption.MessageCrypto = messageCrypto
}
}
err := p.internalCreatePartitionsProducers()
if err != nil {
return nil, err
}
p.stopDiscovery = p.runBackgroundPartitionDiscovery(options.PartitionsAutoDiscoveryInterval)
p.metrics.ProducersOpened.Inc()
return p, nil
}