func()

in pulsar/producer_partition.go [1610:1664]


func (sr *sendRequest) done(msgID MessageID, err error) {
	if err == nil {
		sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9)
		sr.producer.metrics.MessagesPublished.Inc()
		sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))

		if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
			if sr.producer.options.Interceptors != nil {
				sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
			}
		}
	}

	if err != nil {
		sr.producer.log.WithError(err).
			WithField("size", sr.reservedMem).
			WithField("properties", sr.msg.Properties)
	}

	if errors.Is(err, ErrSendTimeout) {
		sr.producer.metrics.PublishErrorsTimeout.Inc()
	}

	if errors.Is(err, ErrMessageTooLarge) {
		sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
	}

	if sr.semaphore != nil {
		sr.semaphore.Release()
		sr.producer.metrics.MessagesPending.Dec()
	}

	if sr.memLimit != nil {
		sr.memLimit.ReleaseMemory(sr.reservedMem)
		sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
	}

	// sr.chunkID == -1 means a chunked message is not yet prepared, so that we should fail it immediately
	if sr.totalChunks <= 1 || sr.chunkID == -1 || sr.chunkID == sr.totalChunks-1 {
		sr.callbackOnce.Do(func() {
			runCallback(sr.callback, msgID, sr.msg, err)
		})

		if sr.transaction != nil {
			sr.transaction.endSendOrAckOp(err)
		}
	}

	pool := sr.pool
	if pool != nil {
		// reset all the fields
		*sr = sendRequest{}
		pool.Put(sr)
	}
}