func()

in pulsar/producer_partition.go [203:343]


func (p *partitionProducer) grabCnx() error {
	lr, err := p.client.lookupService.Lookup(p.topic)
	if err != nil {
		p.log.WithError(err).Warn("Failed to lookup topic")
		return err
	}

	p.log.Debug("Lookup result: ", lr)
	id := p.client.rpcClient.NewRequestID()

	// set schema info for producer

	var pbSchema *pb.Schema
	if p.schemaInfo != nil {
		tmpSchemaType := pb.Schema_Type(int32(p.schemaInfo.Type))
		pbSchema = &pb.Schema{
			Name:       proto.String(p.schemaInfo.Name),
			Type:       &tmpSchemaType,
			SchemaData: []byte(p.schemaInfo.Schema),
			Properties: internal.ConvertFromStringMap(p.schemaInfo.Properties),
		}
		p.log.Debugf("The partition producer schema name is: %s", pbSchema.Name)
	} else {
		p.log.Debug("The partition producer schema is nil")
	}

	cmdProducer := &pb.CommandProducer{
		RequestId:                proto.Uint64(id),
		Topic:                    proto.String(p.topic),
		Encrypted:                nil,
		ProducerId:               proto.Uint64(p.producerID),
		Schema:                   pbSchema,
		Epoch:                    proto.Uint64(atomic.LoadUint64(&p.epoch)),
		UserProvidedProducerName: proto.Bool(p.userProvidedProducerName),
		ProducerAccessMode:       toProtoProducerAccessMode(p.options.ProducerAccessMode).Enum(),
	}

	if p.topicEpoch != nil {
		cmdProducer.TopicEpoch = proto.Uint64(*p.topicEpoch)
	}

	if p.producerName != "" {
		cmdProducer.ProducerName = proto.String(p.producerName)
	}

	if len(p.options.Properties) > 0 {
		cmdProducer.Metadata = toKeyValues(p.options.Properties)
	}
	res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
	if err != nil {
		p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
		if err == internal.ErrRequestTimeOut {
			id := p.client.rpcClient.NewRequestID()
			_, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
				&pb.CommandCloseProducer{
					ProducerId: &p.producerID,
					RequestId:  &id,
				})
		}
		return err
	}

	p.producerName = res.Response.ProducerSuccess.GetProducerName()
	nextTopicEpoch := res.Response.ProducerSuccess.GetTopicEpoch()
	p.topicEpoch = &nextTopicEpoch

	if p.options.Encryption != nil {
		p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
			p.options.Encryption.KeyReader,
			p.options.Encryption.MessageCrypto,
			p.options.Encryption.ProducerCryptoFailureAction, p.log)
	} else {
		p.encryptor = internalcrypto.NewNoopEncryptor()
	}

	if p.sequenceIDGenerator == nil {
		nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
		p.sequenceIDGenerator = &nextSequenceID
	}

	schemaVersion := res.Response.ProducerSuccess.GetSchemaVersion()
	if len(schemaVersion) != 0 {
		p.schemaCache.Put(p.schemaInfo, schemaVersion)
	}

	p._setConn(res.Cnx)
	err = p._getConn().RegisterListener(p.producerID, p)
	if err != nil {
		return err
	}

	if !p.options.DisableBatching && p.batchBuilder == nil {
		provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType)
		if err != nil {
			return err
		}
		maxMessageSize := uint32(p._getConn().GetMaxMessageSize())
		p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
			maxMessageSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
			compression.Level(p.options.CompressionLevel),
			p,
			p.log,
			p.encryptor)
		if err != nil {
			return err
		}
	}

	p.log.WithFields(log.Fields{
		"cnx":   res.Cnx.ID(),
		"epoch": atomic.LoadUint64(&p.epoch),
	}).Info("Connected producer")

	pendingItems := p.pendingQueue.ReadableSlice()
	viewSize := len(pendingItems)
	if viewSize > 0 {
		p.log.Infof("Resending %d pending batches", viewSize)
		lastViewItem := pendingItems[viewSize-1].(*pendingItem)

		// iterate at most pending items
		for i := 0; i < viewSize; i++ {
			item := p.pendingQueue.Poll()
			if item == nil {
				continue
			}
			pi := item.(*pendingItem)
			// when resending pending batches, we update the sendAt timestamp and put to the back of queue
			// to avoid pending item been removed by failTimeoutMessages and cause race condition
			pi.Lock()
			pi.sentAt = time.Now()
			pi.Unlock()
			p.pendingQueue.Put(pi)
			p._getConn().WriteData(pi.buffer)

			if pi == lastViewItem {
				break
			}
		}
	}
	return nil
}