in pulsar/producer_partition.go [1610:1664]
func (sr *sendRequest) done(msgID MessageID, err error) {
if err == nil {
sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9)
sr.producer.metrics.MessagesPublished.Inc()
sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
if sr.producer.options.Interceptors != nil {
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
}
}
}
if err != nil {
sr.producer.log.WithError(err).
WithField("size", sr.reservedMem).
WithField("properties", sr.msg.Properties)
}
if errors.Is(err, ErrSendTimeout) {
sr.producer.metrics.PublishErrorsTimeout.Inc()
}
if errors.Is(err, ErrMessageTooLarge) {
sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
}
if sr.semaphore != nil {
sr.semaphore.Release()
sr.producer.metrics.MessagesPending.Dec()
}
if sr.memLimit != nil {
sr.memLimit.ReleaseMemory(sr.reservedMem)
sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
}
// sr.chunkID == -1 means a chunked message is not yet prepared, so that we should fail it immediately
if sr.totalChunks <= 1 || sr.chunkID == -1 || sr.chunkID == sr.totalChunks-1 {
sr.callbackOnce.Do(func() {
runCallback(sr.callback, msgID, sr.msg, err)
})
if sr.transaction != nil {
sr.transaction.endSendOrAckOp(err)
}
}
pool := sr.pool
if pool != nil {
// reset all the fields
*sr = sendRequest{}
pool.Put(sr)
}
}