in pulsar/producer_partition.go [203:343]
func (p *partitionProducer) grabCnx() error {
lr, err := p.client.lookupService.Lookup(p.topic)
if err != nil {
p.log.WithError(err).Warn("Failed to lookup topic")
return err
}
p.log.Debug("Lookup result: ", lr)
id := p.client.rpcClient.NewRequestID()
// set schema info for producer
var pbSchema *pb.Schema
if p.schemaInfo != nil {
tmpSchemaType := pb.Schema_Type(int32(p.schemaInfo.Type))
pbSchema = &pb.Schema{
Name: proto.String(p.schemaInfo.Name),
Type: &tmpSchemaType,
SchemaData: []byte(p.schemaInfo.Schema),
Properties: internal.ConvertFromStringMap(p.schemaInfo.Properties),
}
p.log.Debugf("The partition producer schema name is: %s", pbSchema.Name)
} else {
p.log.Debug("The partition producer schema is nil")
}
cmdProducer := &pb.CommandProducer{
RequestId: proto.Uint64(id),
Topic: proto.String(p.topic),
Encrypted: nil,
ProducerId: proto.Uint64(p.producerID),
Schema: pbSchema,
Epoch: proto.Uint64(atomic.LoadUint64(&p.epoch)),
UserProvidedProducerName: proto.Bool(p.userProvidedProducerName),
ProducerAccessMode: toProtoProducerAccessMode(p.options.ProducerAccessMode).Enum(),
}
if p.topicEpoch != nil {
cmdProducer.TopicEpoch = proto.Uint64(*p.topicEpoch)
}
if p.producerName != "" {
cmdProducer.ProducerName = proto.String(p.producerName)
}
if len(p.options.Properties) > 0 {
cmdProducer.Metadata = toKeyValues(p.options.Properties)
}
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
if err == internal.ErrRequestTimeOut {
id := p.client.rpcClient.NewRequestID()
_, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
&pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
})
}
return err
}
p.producerName = res.Response.ProducerSuccess.GetProducerName()
nextTopicEpoch := res.Response.ProducerSuccess.GetTopicEpoch()
p.topicEpoch = &nextTopicEpoch
if p.options.Encryption != nil {
p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
p.options.Encryption.KeyReader,
p.options.Encryption.MessageCrypto,
p.options.Encryption.ProducerCryptoFailureAction, p.log)
} else {
p.encryptor = internalcrypto.NewNoopEncryptor()
}
if p.sequenceIDGenerator == nil {
nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
p.sequenceIDGenerator = &nextSequenceID
}
schemaVersion := res.Response.ProducerSuccess.GetSchemaVersion()
if len(schemaVersion) != 0 {
p.schemaCache.Put(p.schemaInfo, schemaVersion)
}
p._setConn(res.Cnx)
err = p._getConn().RegisterListener(p.producerID, p)
if err != nil {
return err
}
if !p.options.DisableBatching && p.batchBuilder == nil {
provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType)
if err != nil {
return err
}
maxMessageSize := uint32(p._getConn().GetMaxMessageSize())
p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
maxMessageSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel),
p,
p.log,
p.encryptor)
if err != nil {
return err
}
}
p.log.WithFields(log.Fields{
"cnx": res.Cnx.ID(),
"epoch": atomic.LoadUint64(&p.epoch),
}).Info("Connected producer")
pendingItems := p.pendingQueue.ReadableSlice()
viewSize := len(pendingItems)
if viewSize > 0 {
p.log.Infof("Resending %d pending batches", viewSize)
lastViewItem := pendingItems[viewSize-1].(*pendingItem)
// iterate at most pending items
for i := 0; i < viewSize; i++ {
item := p.pendingQueue.Poll()
if item == nil {
continue
}
pi := item.(*pendingItem)
// when resending pending batches, we update the sendAt timestamp and put to the back of queue
// to avoid pending item been removed by failTimeoutMessages and cause race condition
pi.Lock()
pi.sentAt = time.Now()
pi.Unlock()
p.pendingQueue.Put(pi)
p._getConn().WriteData(pi.buffer)
if pi == lastViewItem {
break
}
}
}
return nil
}