in pulsar/producer_partition.go [607:689]
func (p *partitionProducer) internalSend(sr *sendRequest) {
p.log.Debug("Received send request: ", *sr.msg)
if sr.sendAsBatch {
smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize))
multiSchemaEnabled := !p.options.DisableMultiSchema
added := addRequestToBatch(
smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry
p.internalFlushCurrentBatch()
// after flushing try again to add the current payload
ok := addRequestToBatch(
smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
if !ok {
p.log.WithField("size", sr.uncompressedSize).
WithField("properties", sr.msg.Properties).
Error("unable to add message to batch")
sr.done(nil, ErrFailAddToBatch)
return
}
}
if sr.flushImmediately {
p.internalFlushCurrentBatch()
}
return
}
if sr.totalChunks <= 1 {
p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize))
return
}
var lhs, rhs int
uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*sr.mm.SequenceId, 10))
sr.mm.Uuid = proto.String(uuid)
sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks))
sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize))
cr := newChunkRecorder()
for chunkID := 0; chunkID < sr.totalChunks; chunkID++ {
lhs = chunkID * sr.payloadChunkSize
if rhs = lhs + sr.payloadChunkSize; rhs > sr.compressedSize {
rhs = sr.compressedSize
}
// update chunk id
sr.mm.ChunkId = proto.Int32(int32(chunkID))
nsr := sendRequestPool.Get().(*sendRequest)
*nsr = sendRequest{
pool: sendRequestPool,
ctx: sr.ctx,
msg: sr.msg,
producer: sr.producer,
callback: sr.callback,
callbackOnce: sr.callbackOnce,
publishTime: sr.publishTime,
flushImmediately: sr.flushImmediately,
totalChunks: sr.totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
transaction: sr.transaction,
memLimit: sr.memLimit,
semaphore: sr.semaphore,
reservedMem: int64(rhs - lhs),
sendAsBatch: sr.sendAsBatch,
schema: sr.schema,
schemaVersion: sr.schemaVersion,
uncompressedPayload: sr.uncompressedPayload,
uncompressedSize: sr.uncompressedSize,
compressedPayload: sr.compressedPayload,
compressedSize: sr.compressedSize,
payloadChunkSize: sr.payloadChunkSize,
mm: sr.mm,
deliverAt: sr.deliverAt,
maxMessageSize: sr.maxMessageSize,
}
p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize))
}
}