func()

in pulsar/producer_partition.go [1358:1431]


func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
	pi, ok := p.pendingQueue.Peek().(*pendingItem)

	if !ok {
		// if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.
		p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId())
		return
	}

	if pi.sequenceID < response.GetSequenceId() {
		// Force connection closing so that messages can be re-transmitted in a new connection
		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local < remote, closing connection",
			response.GetMessageId(), response.GetSequenceId(), pi.sequenceID)
		p._getConn().Close()
		return
	} else if pi.sequenceID > response.GetSequenceId() {
		// Ignoring the ack since it's referring to a message that has already timed out.
		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local > remote, ignore it",
			response.GetMessageId(), response.GetSequenceId(), pi.sequenceID)
		return
	}
	// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
	p.pendingQueue.Poll()

	now := time.Now().UnixNano()

	// lock the pending item while sending the requests
	pi.Lock()
	defer pi.Unlock()
	p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
	batchSize := int32(len(pi.sendRequests))
	for idx, i := range pi.sendRequests {
		sr := i.(*sendRequest)
		atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))

		msgID := newMessageID(
			int64(response.MessageId.GetLedgerId()),
			int64(response.MessageId.GetEntryId()),
			int32(idx),
			p.partitionIdx,
			batchSize,
		)

		if sr.totalChunks > 1 {
			switch sr.chunkID {
			case 0:
				sr.chunkRecorder.setFirstChunkID(
					&messageID{
						int64(response.MessageId.GetLedgerId()),
						int64(response.MessageId.GetEntryId()),
						-1,
						p.partitionIdx,
						0,
					})
			case sr.totalChunks - 1:
				sr.chunkRecorder.setLastChunkID(
					&messageID{
						int64(response.MessageId.GetLedgerId()),
						int64(response.MessageId.GetEntryId()),
						-1,
						p.partitionIdx,
						0,
					})
				// use chunkMsgID to set msgID
				msgID = &sr.chunkRecorder.chunkedMsgID
			}
		}

		sr.done(msgID, nil)
	}

	// Mark this pending item as done
	pi.done(nil)
}