func()

in pulsar/producer_partition.go [607:689]


func (p *partitionProducer) internalSend(sr *sendRequest) {
	p.log.Debug("Received send request: ", *sr.msg)

	if sr.sendAsBatch {
		smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize))
		multiSchemaEnabled := !p.options.DisableMultiSchema

		added := addRequestToBatch(
			smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
		if !added {
			// The current batch is full. flush it and retry
			p.internalFlushCurrentBatch()

			// after flushing try again to add the current payload
			ok := addRequestToBatch(
				smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
			if !ok {
				p.log.WithField("size", sr.uncompressedSize).
					WithField("properties", sr.msg.Properties).
					Error("unable to add message to batch")
				sr.done(nil, ErrFailAddToBatch)
				return
			}
		}

		if sr.flushImmediately {
			p.internalFlushCurrentBatch()
		}
		return
	}

	if sr.totalChunks <= 1 {
		p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize))
		return
	}

	var lhs, rhs int
	uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*sr.mm.SequenceId, 10))
	sr.mm.Uuid = proto.String(uuid)
	sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks))
	sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize))
	cr := newChunkRecorder()
	for chunkID := 0; chunkID < sr.totalChunks; chunkID++ {
		lhs = chunkID * sr.payloadChunkSize
		if rhs = lhs + sr.payloadChunkSize; rhs > sr.compressedSize {
			rhs = sr.compressedSize
		}
		// update chunk id
		sr.mm.ChunkId = proto.Int32(int32(chunkID))
		nsr := sendRequestPool.Get().(*sendRequest)
		*nsr = sendRequest{
			pool:                sendRequestPool,
			ctx:                 sr.ctx,
			msg:                 sr.msg,
			producer:            sr.producer,
			callback:            sr.callback,
			callbackOnce:        sr.callbackOnce,
			publishTime:         sr.publishTime,
			flushImmediately:    sr.flushImmediately,
			totalChunks:         sr.totalChunks,
			chunkID:             chunkID,
			uuid:                uuid,
			chunkRecorder:       cr,
			transaction:         sr.transaction,
			memLimit:            sr.memLimit,
			semaphore:           sr.semaphore,
			reservedMem:         int64(rhs - lhs),
			sendAsBatch:         sr.sendAsBatch,
			schema:              sr.schema,
			schemaVersion:       sr.schemaVersion,
			uncompressedPayload: sr.uncompressedPayload,
			uncompressedSize:    sr.uncompressedSize,
			compressedPayload:   sr.compressedPayload,
			compressedSize:      sr.compressedSize,
			payloadChunkSize:    sr.payloadChunkSize,
			mm:                  sr.mm,
			deliverAt:           sr.deliverAt,
			maxMessageSize:      sr.maxMessageSize,
		}

		p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize))
	}
}