func()

in pulsar/producer_partition.go [804:857]


func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
	compressedPayload []byte,
	request *sendRequest,
	maxMessageSize uint32) {
	msg := request.msg

	payloadBuf := internal.NewBuffer(len(compressedPayload))
	payloadBuf.Write(compressedPayload)

	buffer := p.GetBuffer()
	if buffer == nil {
		buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
	}

	sid := *mm.SequenceId
	var useTxn bool
	var mostSigBits uint64
	var leastSigBits uint64

	if request.transaction != nil {
		txnID := request.transaction.GetTxnID()
		useTxn = true
		mostSigBits = txnID.MostSigBits
		leastSigBits = txnID.LeastSigBits
	}

	err := internal.SingleSend(
		buffer,
		p.producerID,
		sid,
		mm,
		payloadBuf,
		p.encryptor,
		maxMessageSize,
		useTxn,
		mostSigBits,
		leastSigBits,
	)

	if err != nil {
		runCallback(request.callback, nil, request.msg, err)
		p.releaseSemaphoreAndMem(request.reservedMem)
		p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
		return
	}

	p.pendingQueue.Put(&pendingItem{
		sentAt:       time.Now(),
		buffer:       buffer,
		sequenceID:   sid,
		sendRequests: []interface{}{request},
	})
	p._getConn().WriteData(buffer)
}