in pulsar/producer_partition.go [1124:1196]
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
if msg == nil {
p.log.Error("Message is nil")
runCallback(callback, nil, msg, newError(InvalidMessage, "Message is nil"))
return
}
if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both"))
return
}
// Register transaction operation to transaction and the transaction coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
var txn *transaction
if msg.Transaction != nil {
transactionImpl := (msg.Transaction).(*transaction)
txn = transactionImpl
if transactionImpl.state != TxnOpen {
p.log.WithField("state", transactionImpl.state).Error("Failed to send message" +
" by a non-open transaction.")
runCallback(callback, nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
return
}
if err := transactionImpl.registerProducerTopic(p.topic); err != nil {
runCallback(callback, nil, msg, err)
return
}
if err := transactionImpl.registerSendOrAckOp(); err != nil {
runCallback(callback, nil, msg, err)
return
}
newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) {
runCallback(callback, id, producerMessage, err)
transactionImpl.endSendOrAckOp(err)
}
} else {
newCallback = callback
}
if p.getProducerState() != producerReady {
// Producer is closing
runCallback(newCallback, nil, msg, errProducerClosed)
return
}
// bc only works when DisableBlockIfQueueFull is false
bc := make(chan struct{})
// callbackOnce make sure the callback is only invoked once in chunking
callbackOnce := &sync.Once{}
sr := &sendRequest{
ctx: ctx,
msg: msg,
callback: newCallback,
callbackOnce: callbackOnce,
flushImmediately: flushImmediately,
publishTime: time.Now(),
blockCh: bc,
closeBlockChOnce: &sync.Once{},
transaction: txn,
}
p.options.Interceptors.BeforeSend(p, msg)
p.dataChan <- sr
if !p.options.DisableBlockIfQueueFull {
// block if queue full
<-bc
}
}