in pulsar/producer_partition.go [1358:1431]
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
pi, ok := p.pendingQueue.Peek().(*pendingItem)
if !ok {
// if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.
p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId())
return
}
if pi.sequenceID < response.GetSequenceId() {
// Force connection closing so that messages can be re-transmitted in a new connection
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local < remote, closing connection",
response.GetMessageId(), response.GetSequenceId(), pi.sequenceID)
p._getConn().Close()
return
} else if pi.sequenceID > response.GetSequenceId() {
// Ignoring the ack since it's referring to a message that has already timed out.
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local > remote, ignore it",
response.GetMessageId(), response.GetSequenceId(), pi.sequenceID)
return
}
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
p.pendingQueue.Poll()
now := time.Now().UnixNano()
// lock the pending item while sending the requests
pi.Lock()
defer pi.Unlock()
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
batchSize := int32(len(pi.sendRequests))
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
msgID := newMessageID(
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
int32(idx),
p.partitionIdx,
batchSize,
)
if sr.totalChunks > 1 {
switch sr.chunkID {
case 0:
sr.chunkRecorder.setFirstChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
case sr.totalChunks - 1:
sr.chunkRecorder.setLastChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
// use chunkMsgID to set msgID
msgID = &sr.chunkRecorder.chunkedMsgID
}
}
sr.done(msgID, nil)
}
// Mark this pending item as done
pi.done(nil)
}