func()

in pulsar/producer_partition.go [1198:1286]


func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
	pi, ok := p.pendingQueue.Peek().(*pendingItem)

	if !ok {
		// if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.
		p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId())
		return
	}

	if pi.sequenceID < response.GetSequenceId() {
		// Force connection closing so that messages can be re-transmitted in a new connection
		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
			response.GetSequenceId(), pi.sequenceID)
		p._getConn().Close()
		return
	} else if pi.sequenceID > response.GetSequenceId() {
		// Ignoring the ack since it's referring to a message that has already timed out.
		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
			response.GetSequenceId(), pi.sequenceID)
		return
	} else {
		// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
		p.pendingQueue.Poll()

		now := time.Now().UnixNano()

		// lock the pending item while sending the requests
		pi.Lock()
		defer pi.Unlock()
		p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
		batchSize := int32(len(pi.sendRequests))
		for idx, i := range pi.sendRequests {
			sr := i.(*sendRequest)
			atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
			p.releaseSemaphoreAndMem(sr.reservedMem)
			p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
			p.metrics.MessagesPublished.Inc()
			p.metrics.MessagesPending.Dec()
			payloadSize := float64(len(sr.msg.Payload))
			p.metrics.BytesPublished.Add(payloadSize)
			p.metrics.BytesPending.Sub(payloadSize)

			if sr.callback != nil || len(p.options.Interceptors) > 0 {
				msgID := newMessageID(
					int64(response.MessageId.GetLedgerId()),
					int64(response.MessageId.GetEntryId()),
					int32(idx),
					p.partitionIdx,
					batchSize,
				)

				if sr.totalChunks > 1 {
					if sr.chunkID == 0 {
						sr.chunkRecorder.setFirstChunkID(
							&messageID{
								int64(response.MessageId.GetLedgerId()),
								int64(response.MessageId.GetEntryId()),
								-1,
								p.partitionIdx,
								0,
							})
					} else if sr.chunkID == sr.totalChunks-1 {
						sr.chunkRecorder.setLastChunkID(
							&messageID{
								int64(response.MessageId.GetLedgerId()),
								int64(response.MessageId.GetEntryId()),
								-1,
								p.partitionIdx,
								0,
							})
						// use chunkMsgID to set msgID
						msgID = &sr.chunkRecorder.chunkedMsgID
					}
				}

				if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
					runCallback(sr.callback, msgID, sr.msg, nil)
					p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
				}
			}
			if sr.transaction != nil {
				sr.transaction.endSendOrAckOp(nil)
			}
		}

		// Mark this pending item as done
		pi.Complete(nil)
	}
}