in pulsar/producer_partition.go [1198:1286]
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
pi, ok := p.pendingQueue.Peek().(*pendingItem)
if !ok {
// if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.
p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId())
return
}
if pi.sequenceID < response.GetSequenceId() {
// Force connection closing so that messages can be re-transmitted in a new connection
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
p._getConn().Close()
return
} else if pi.sequenceID > response.GetSequenceId() {
// Ignoring the ack since it's referring to a message that has already timed out.
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
return
} else {
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
p.pendingQueue.Poll()
now := time.Now().UnixNano()
// lock the pending item while sending the requests
pi.Lock()
defer pi.Unlock()
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
batchSize := int32(len(pi.sendRequests))
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(sr.reservedMem)
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
payloadSize := float64(len(sr.msg.Payload))
p.metrics.BytesPublished.Add(payloadSize)
p.metrics.BytesPending.Sub(payloadSize)
if sr.callback != nil || len(p.options.Interceptors) > 0 {
msgID := newMessageID(
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
int32(idx),
p.partitionIdx,
batchSize,
)
if sr.totalChunks > 1 {
if sr.chunkID == 0 {
sr.chunkRecorder.setFirstChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
} else if sr.chunkID == sr.totalChunks-1 {
sr.chunkRecorder.setLastChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
// use chunkMsgID to set msgID
msgID = &sr.chunkRecorder.chunkedMsgID
}
}
if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
runCallback(sr.callback, msgID, sr.msg, nil)
p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
}
}
if sr.transaction != nil {
sr.transaction.endSendOrAckOp(nil)
}
}
// Mark this pending item as done
pi.Complete(nil)
}
}