in pulsar/producer_partition.go [804:857]
func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
compressedPayload []byte,
request *sendRequest,
maxMessageSize uint32) {
msg := request.msg
payloadBuf := internal.NewBuffer(len(compressedPayload))
payloadBuf.Write(compressedPayload)
buffer := p.GetBuffer()
if buffer == nil {
buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
}
sid := *mm.SequenceId
var useTxn bool
var mostSigBits uint64
var leastSigBits uint64
if request.transaction != nil {
txnID := request.transaction.GetTxnID()
useTxn = true
mostSigBits = txnID.MostSigBits
leastSigBits = txnID.LeastSigBits
}
err := internal.SingleSend(
buffer,
p.producerID,
sid,
mm,
payloadBuf,
p.encryptor,
maxMessageSize,
useTxn,
mostSigBits,
leastSigBits,
)
if err != nil {
runCallback(request.callback, nil, request.msg, err)
p.releaseSemaphoreAndMem(request.reservedMem)
p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
return
}
p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
buffer: buffer,
sequenceID: sid,
sendRequests: []interface{}{request},
})
p._getConn().WriteData(buffer)
}