func()

in pulsar/producer_partition.go [1124:1196]


func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
	if msg == nil {
		p.log.Error("Message is nil")
		runCallback(callback, nil, msg, newError(InvalidMessage, "Message is nil"))
		return
	}

	if msg.Value != nil && msg.Payload != nil {
		p.log.Error("Can not set Value and Payload both")
		runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both"))
		return
	}

	// Register transaction operation to transaction and the transaction coordinator.
	var newCallback func(MessageID, *ProducerMessage, error)
	var txn *transaction
	if msg.Transaction != nil {
		transactionImpl := (msg.Transaction).(*transaction)
		txn = transactionImpl
		if transactionImpl.state != TxnOpen {
			p.log.WithField("state", transactionImpl.state).Error("Failed to send message" +
				" by a non-open transaction.")
			runCallback(callback, nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
			return
		}

		if err := transactionImpl.registerProducerTopic(p.topic); err != nil {
			runCallback(callback, nil, msg, err)
			return
		}
		if err := transactionImpl.registerSendOrAckOp(); err != nil {
			runCallback(callback, nil, msg, err)
			return
		}
		newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) {
			runCallback(callback, id, producerMessage, err)
			transactionImpl.endSendOrAckOp(err)
		}
	} else {
		newCallback = callback
	}
	if p.getProducerState() != producerReady {
		// Producer is closing
		runCallback(newCallback, nil, msg, errProducerClosed)
		return
	}

	// bc only works when DisableBlockIfQueueFull is false
	bc := make(chan struct{})

	// callbackOnce make sure the callback is only invoked once in chunking
	callbackOnce := &sync.Once{}
	sr := &sendRequest{
		ctx:              ctx,
		msg:              msg,
		callback:         newCallback,
		callbackOnce:     callbackOnce,
		flushImmediately: flushImmediately,
		publishTime:      time.Now(),
		blockCh:          bc,
		closeBlockChOnce: &sync.Once{},
		transaction:      txn,
	}
	p.options.Interceptors.BeforeSend(p, msg)

	p.dataChan <- sr

	if !p.options.DisableBlockIfQueueFull {
		// block if queue full
		<-bc
	}
}