func()

in pulsar/producer_partition.go [259:405]


func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
	lr, err := p.lookupTopic(assignedBrokerURL)
	if err != nil {
		return err
	}
	id := p.client.rpcClient.NewRequestID()

	// set schema info for producer

	var pbSchema *pb.Schema
	if p.schemaInfo != nil {
		tmpSchemaType := pb.Schema_Type(int32(p.schemaInfo.Type))
		pbSchema = &pb.Schema{
			Name:       proto.String(p.schemaInfo.Name),
			Type:       &tmpSchemaType,
			SchemaData: []byte(p.schemaInfo.Schema),
			Properties: internal.ConvertFromStringMap(p.schemaInfo.Properties),
		}
		p.log.Debugf("The partition producer schema name is: %s", pbSchema.Name)
	} else {
		p.log.Debug("The partition producer schema is nil")
	}

	cmdProducer := &pb.CommandProducer{
		RequestId:                proto.Uint64(id),
		Topic:                    proto.String(p.topic),
		Encrypted:                nil,
		ProducerId:               proto.Uint64(p.producerID),
		Schema:                   pbSchema,
		Epoch:                    proto.Uint64(atomic.LoadUint64(&p.epoch)),
		UserProvidedProducerName: proto.Bool(p.userProvidedProducerName),
		ProducerAccessMode:       toProtoProducerAccessMode(p.options.ProducerAccessMode).Enum(),
		InitialSubscriptionName:  proto.String(p.options.initialSubscriptionName),
	}

	if p.topicEpoch != nil {
		cmdProducer.TopicEpoch = proto.Uint64(*p.topicEpoch)
	}

	if p.producerName != "" {
		cmdProducer.ProducerName = proto.String(p.producerName)
	}

	if len(p.options.Properties) > 0 {
		cmdProducer.Metadata = toKeyValues(p.options.Properties)
	}

	cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr, p.cnxKeySuffix)
	// registering the producer first in case broker sends commands in the middle
	if err != nil {
		p.log.Error("Failed to get connection")
		return err
	}

	p._setConn(cnx)
	err = p._getConn().RegisterListener(p.producerID, p)
	if err != nil {
		p.log.WithError(err).Errorf("Failed to register listener: {%d}", p.producerID)
	}

	res, err := p.client.rpcClient.RequestOnCnx(cnx, id, pb.BaseCommand_PRODUCER, cmdProducer)
	if err != nil {
		p._getConn().UnregisterListener(p.producerID)
		p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
		if errors.Is(err, internal.ErrRequestTimeOut) {
			id := p.client.rpcClient.NewRequestID()
			_, _ = p.client.rpcClient.RequestOnCnx(cnx, id, pb.BaseCommand_CLOSE_PRODUCER,
				&pb.CommandCloseProducer{
					ProducerId: &p.producerID,
					RequestId:  &id,
				})
		}
		return err
	}

	p.producerName = res.Response.ProducerSuccess.GetProducerName()
	nextTopicEpoch := res.Response.ProducerSuccess.GetTopicEpoch()
	p.topicEpoch = &nextTopicEpoch

	if p.options.Encryption != nil {
		p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
			p.options.Encryption.KeyReader,
			p.options.Encryption.MessageCrypto,
			p.options.Encryption.ProducerCryptoFailureAction, p.log)
	} else {
		p.encryptor = internalcrypto.NewNoopEncryptor()
	}

	if p.sequenceIDGenerator == nil {
		nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
		p.sequenceIDGenerator = &nextSequenceID
	}

	schemaVersion := res.Response.ProducerSuccess.GetSchemaVersion()
	if len(schemaVersion) != 0 {
		p.schemaCache.Put(p.schemaInfo, schemaVersion)
	}

	if !p.options.DisableBatching && p.batchBuilder == nil {
		provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType)
		if err != nil {
			return err
		}
		maxMessageSize := uint32(p._getConn().GetMaxMessageSize())
		p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
			maxMessageSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
			compression.Level(p.options.CompressionLevel),
			p,
			p.log,
			p.encryptor)
		if err != nil {
			return err
		}
	}

	p.log.WithFields(log.Fields{
		"cnx":   res.Cnx.ID(),
		"epoch": atomic.LoadUint64(&p.epoch),
	}).Info("Connected producer")

	pendingItems := p.pendingQueue.ReadableSlice()
	viewSize := len(pendingItems)
	if viewSize > 0 {
		p.log.Infof("Resending %d pending batches", viewSize)
		lastViewItem := pendingItems[viewSize-1].(*pendingItem)

		// iterate at most pending items
		for i := 0; i < viewSize; i++ {
			item := p.pendingQueue.Poll()
			if item == nil {
				continue
			}
			pi := item.(*pendingItem)
			// when resending pending batches, we update the sendAt timestamp to record the metric.
			pi.Lock()
			pi.sentAt = time.Now()
			pi.Unlock()
			p.pendingQueue.Put(pi)
			p._getConn().WriteData(pi.ctx, pi.buffer)

			if pi == lastViewItem {
				break
			}
		}
	}
	return nil
}