in pulsar/producer_partition.go [1292:1356]
func (p *partitionProducer) internalSendAsync(
ctx context.Context,
msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error),
flushImmediately bool,
) {
if err := p.validateMsg(msg); err != nil {
p.log.Error(err)
runCallback(callback, nil, msg, err)
return
}
sr := sendRequestPool.Get().(*sendRequest)
*sr = sendRequest{
pool: sendRequestPool,
ctx: ctx,
msg: msg,
producer: p,
callback: callback,
callbackOnce: &sync.Once{},
flushImmediately: flushImmediately,
publishTime: time.Now(),
chunkID: -1,
}
if err := p.prepareTransaction(sr); err != nil {
sr.done(nil, err)
return
}
if p.getProducerState() != producerReady {
sr.done(nil, ErrProducerClosed)
return
}
p.options.Interceptors.BeforeSend(p, msg)
if err := p.updateSchema(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
}
if err := p.updateUncompressedPayload(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
}
p.updateMetaData(sr)
if err := p.updateChunkInfo(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
}
if err := p.reserveResources(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
}
p.dataChan <- sr
}