func()

in pulsar/producer_partition.go [1292:1356]


func (p *partitionProducer) internalSendAsync(
	ctx context.Context,
	msg *ProducerMessage,
	callback func(MessageID, *ProducerMessage, error),
	flushImmediately bool,
) {
	if err := p.validateMsg(msg); err != nil {
		p.log.Error(err)
		runCallback(callback, nil, msg, err)
		return
	}

	sr := sendRequestPool.Get().(*sendRequest)
	*sr = sendRequest{
		pool:             sendRequestPool,
		ctx:              ctx,
		msg:              msg,
		producer:         p,
		callback:         callback,
		callbackOnce:     &sync.Once{},
		flushImmediately: flushImmediately,
		publishTime:      time.Now(),
		chunkID:          -1,
	}

	if err := p.prepareTransaction(sr); err != nil {
		sr.done(nil, err)
		return
	}

	if p.getProducerState() != producerReady {
		sr.done(nil, ErrProducerClosed)
		return
	}

	p.options.Interceptors.BeforeSend(p, msg)

	if err := p.updateSchema(sr); err != nil {
		p.log.Error(err)
		sr.done(nil, err)
		return
	}

	if err := p.updateUncompressedPayload(sr); err != nil {
		p.log.Error(err)
		sr.done(nil, err)
		return
	}

	p.updateMetaData(sr)

	if err := p.updateChunkInfo(sr); err != nil {
		p.log.Error(err)
		sr.done(nil, err)
		return
	}

	if err := p.reserveResources(sr); err != nil {
		p.log.Error(err)
		sr.done(nil, err)
		return
	}

	p.dataChan <- sr
}