in pulsar/producer_partition.go [481:707]
func (p *partitionProducer) internalSend(request *sendRequest) {
p.log.Debug("Received send request: ", *request.msg)
msg := request.msg
// read payload from message
uncompressedPayload := msg.Payload
var schemaPayload []byte
var err error
// The block chan must be closed when returned with exception
defer request.stopBlock()
if !p.canAddToQueue(request) {
return
}
if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}
}
var schema Schema
var schemaVersion []byte
if msg.Schema != nil {
schema = msg.Schema
} else if p.options.Schema != nil {
schema = p.options.Schema
}
if msg.Value != nil {
// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
if uncompressedPayload == nil && schema != nil {
schemaPayload, err = schema.Encode(msg.Value)
if err != nil {
runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
return
}
}
}
if uncompressedPayload == nil {
uncompressedPayload = schemaPayload
}
if schema != nil {
schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
p.log.WithError(err).Error("get schema version fail")
runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
return
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
}
uncompressedSize := len(uncompressedPayload)
// try to reserve memory for uncompressedPayload
if !p.canReserveMem(request, int64(uncompressedSize)) {
return
}
deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
}
mm := p.genMetadata(msg, uncompressedSize, deliverAt)
// set default ReplicationClusters when DisableReplication
if msg.DisableReplication {
msg.ReplicationClusters = []string{"__local__"}
}
sendAsBatch := !p.options.DisableBatching &&
msg.ReplicationClusters == nil &&
deliverAt.UnixNano() < 0
// Once the batching is enabled, it can close blockCh early to make block finish
if sendAsBatch {
request.stopBlock()
} else {
// update sequence id for metadata, make the size of msgMetadata more accurate
// batch sending will update sequence ID in the BatchBuilder
p.updateMetadataSeqID(mm, msg)
}
maxMessageSize := int(p._getConn().GetMaxMessageSize())
// compress payload if not batching
var compressedPayload []byte
var compressedSize int
var checkSize int
if !sendAsBatch {
compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
compressedSize = len(compressedPayload)
checkSize = compressedSize
// set the compress type in msgMetaData
compressionType := pb.CompressionType(p.options.CompressionType)
if compressionType != pb.CompressionType_NONE {
mm.Compression = &compressionType
}
} else {
// final check for batching message is in serializeMessage
// this is a double check
checkSize = uncompressedSize
}
// if msg is too large and chunking is disabled
if checkSize > maxMessageSize && !p.options.EnableChunking {
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
var totalChunks int
// max chunk payload size
var payloadChunkSize int
if sendAsBatch || !p.options.EnableChunking {
totalChunks = 1
payloadChunkSize = int(p._getConn().GetMaxMessageSize())
} else {
payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm)
if payloadChunkSize <= 0 {
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(mm)).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
}
totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
}
// set total chunks to send request
request.totalChunks = totalChunks
if !sendAsBatch {
if totalChunks > 1 {
var lhs, rhs int
uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
mm.Uuid = proto.String(uuid)
mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize))
cr := newChunkRecorder()
for chunkID := 0; chunkID < totalChunks; chunkID++ {
lhs = chunkID * payloadChunkSize
if rhs = lhs + payloadChunkSize; rhs > compressedSize {
rhs = compressedSize
}
// update chunk id
mm.ChunkId = proto.Int32(int32(chunkID))
nsr := &sendRequest{
ctx: request.ctx,
msg: request.msg,
callback: request.callback,
callbackOnce: request.callbackOnce,
publishTime: request.publishTime,
blockCh: request.blockCh,
closeBlockChOnce: request.closeBlockChOnce,
totalChunks: totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
transaction: request.transaction,
reservedMem: int64(rhs - lhs),
}
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr) {
p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
return
}
p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
}
// close the blockCh when all the chunks acquired permits
request.stopBlock()
} else {
// close the blockCh when totalChunks is 1 (it has acquired permits)
request.stopBlock()
p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
}
} else {
smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
multiSchemaEnabled := !p.options.DisableMultiSchema
added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry
p.internalFlushCurrentBatch()
// after flushing try again to add the current payload
if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
return
}
}
if request.flushImmediately {
p.internalFlushCurrentBatch()
}
}
}