func()

in pulsar/producer_partition.go [481:707]


func (p *partitionProducer) internalSend(request *sendRequest) {
	p.log.Debug("Received send request: ", *request.msg)

	msg := request.msg

	// read payload from message
	uncompressedPayload := msg.Payload

	var schemaPayload []byte
	var err error

	// The block chan must be closed when returned with exception
	defer request.stopBlock()
	if !p.canAddToQueue(request) {
		return
	}

	if p.options.DisableMultiSchema {
		if msg.Schema != nil && p.options.Schema != nil &&
			msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
			runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
			p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
			return
		}
	}
	var schema Schema
	var schemaVersion []byte
	if msg.Schema != nil {
		schema = msg.Schema
	} else if p.options.Schema != nil {
		schema = p.options.Schema
	}
	if msg.Value != nil {
		// payload and schema are mutually exclusive
		// try to get payload from schema value only if payload is not set
		if uncompressedPayload == nil && schema != nil {
			schemaPayload, err = schema.Encode(msg.Value)
			if err != nil {
				runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error()))
				p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
				return
			}
		}
	}
	if uncompressedPayload == nil {
		uncompressedPayload = schemaPayload
	}

	if schema != nil {
		schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
		if schemaVersion == nil {
			schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
			if err != nil {
				p.log.WithError(err).Error("get schema version fail")
				runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
				return
			}
			p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
		}
	}

	uncompressedSize := len(uncompressedPayload)

	// try to reserve memory for uncompressedPayload
	if !p.canReserveMem(request, int64(uncompressedSize)) {
		return
	}

	deliverAt := msg.DeliverAt
	if msg.DeliverAfter.Nanoseconds() > 0 {
		deliverAt = time.Now().Add(msg.DeliverAfter)
	}

	mm := p.genMetadata(msg, uncompressedSize, deliverAt)

	// set default ReplicationClusters when DisableReplication
	if msg.DisableReplication {
		msg.ReplicationClusters = []string{"__local__"}
	}

	sendAsBatch := !p.options.DisableBatching &&
		msg.ReplicationClusters == nil &&
		deliverAt.UnixNano() < 0

	// Once the batching is enabled, it can close blockCh early to make block finish
	if sendAsBatch {
		request.stopBlock()
	} else {
		// update sequence id for metadata, make the size of msgMetadata more accurate
		// batch sending will update sequence ID in the BatchBuilder
		p.updateMetadataSeqID(mm, msg)
	}

	maxMessageSize := int(p._getConn().GetMaxMessageSize())

	// compress payload if not batching
	var compressedPayload []byte
	var compressedSize int
	var checkSize int
	if !sendAsBatch {
		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
		compressedSize = len(compressedPayload)
		checkSize = compressedSize

		// set the compress type in msgMetaData
		compressionType := pb.CompressionType(p.options.CompressionType)
		if compressionType != pb.CompressionType_NONE {
			mm.Compression = &compressionType
		}
	} else {
		// final check for batching message is in serializeMessage
		// this is a double check
		checkSize = uncompressedSize
	}

	// if msg is too large and chunking is disabled
	if checkSize > maxMessageSize && !p.options.EnableChunking {
		p.releaseSemaphoreAndMem(int64(uncompressedSize))
		runCallback(request.callback, nil, request.msg, errMessageTooLarge)
		p.log.WithError(errMessageTooLarge).
			WithField("size", checkSize).
			WithField("properties", msg.Properties).
			Errorf("MaxMessageSize %d", maxMessageSize)
		p.metrics.PublishErrorsMsgTooLarge.Inc()
		return
	}

	var totalChunks int
	// max chunk payload size
	var payloadChunkSize int
	if sendAsBatch || !p.options.EnableChunking {
		totalChunks = 1
		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
	} else {
		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm)
		if payloadChunkSize <= 0 {
			p.releaseSemaphoreAndMem(int64(uncompressedSize))
			runCallback(request.callback, nil, msg, errMetaTooLarge)
			p.log.WithError(errMetaTooLarge).
				WithField("metadata size", proto.Size(mm)).
				WithField("properties", msg.Properties).
				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
			p.metrics.PublishErrorsMsgTooLarge.Inc()
			return
		}
		// set ChunkMaxMessageSize
		if p.options.ChunkMaxMessageSize != 0 {
			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
		}
		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
	}

	// set total chunks to send request
	request.totalChunks = totalChunks

	if !sendAsBatch {
		if totalChunks > 1 {
			var lhs, rhs int
			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
			mm.Uuid = proto.String(uuid)
			mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
			mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize))
			cr := newChunkRecorder()
			for chunkID := 0; chunkID < totalChunks; chunkID++ {
				lhs = chunkID * payloadChunkSize
				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
					rhs = compressedSize
				}
				// update chunk id
				mm.ChunkId = proto.Int32(int32(chunkID))
				nsr := &sendRequest{
					ctx:              request.ctx,
					msg:              request.msg,
					callback:         request.callback,
					callbackOnce:     request.callbackOnce,
					publishTime:      request.publishTime,
					blockCh:          request.blockCh,
					closeBlockChOnce: request.closeBlockChOnce,
					totalChunks:      totalChunks,
					chunkID:          chunkID,
					uuid:             uuid,
					chunkRecorder:    cr,
					transaction:      request.transaction,
					reservedMem:      int64(rhs - lhs),
				}
				// the permit of first chunk has acquired
				if chunkID != 0 && !p.canAddToQueue(nsr) {
					p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
					return
				}
				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
			}
			// close the blockCh when all the chunks acquired permits
			request.stopBlock()
		} else {
			// close the blockCh when totalChunks is 1 (it has acquired permits)
			request.stopBlock()
			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
		}
	} else {
		smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
		multiSchemaEnabled := !p.options.DisableMultiSchema
		added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
			multiSchemaEnabled)
		if !added {
			// The current batch is full. flush it and retry

			p.internalFlushCurrentBatch()

			// after flushing try again to add the current payload
			if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
				multiSchemaEnabled); !ok {
				p.releaseSemaphoreAndMem(int64(uncompressedSize))
				runCallback(request.callback, nil, request.msg, errFailAddToBatch)
				p.log.WithField("size", uncompressedSize).
					WithField("properties", msg.Properties).
					Error("unable to add message to batch")
				return
			}
		}
		if request.flushImmediately {

			p.internalFlushCurrentBatch()

		}
	}
}