void ActiveMQSessionKernel::send()

in activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp [912:1014]


void ActiveMQSessionKernel::send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
                                 cms::Message* message, int deliveryMode, int priority, long long timeToLive,
                                 util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete) {

    try {

        this->checkClosed();

        if (destination->isTemporary()) {
            Pointer<ActiveMQTempDestination> tempDest = destination.dynamicCast<ActiveMQTempDestination>();
            if (this->connection->isDeleted(tempDest)) {
                throw cms::InvalidDestinationException(
                    std::string("Cannot publish to a deleted Destination: ") + destination->toString());
            }
        }

        synchronized(&this->config->sendMutex) {

            // Ensure that a new transaction is started if this is the first message
            // sent since the last commit, Broker is notified of a new TX.
            doStartTransaction();

            Pointer<TransactionId> txId = this->transaction->getTransactionId();
            Pointer<ProducerInfo> producerInfo = producer->getProducerInfo();
            Pointer<ProducerId> producerId = producerInfo->getProducerId();
            long long sequenceId = producer->getNextMessageSequence();

            // Set the "CMS" header fields on the original message, see JMS 1.1 spec section 3.4.11
            message->setCMSDeliveryMode(deliveryMode);
            long long expiration = 0LL;
            if (!producer->getDisableMessageTimeStamp()) {
                long long timeStamp = System::currentTimeMillis();
                message->setCMSTimestamp(timeStamp);
                if (timeToLive > 0) {
                    expiration = timeToLive + timeStamp;
                }
            }
            message->setCMSExpiration(expiration);
            message->setCMSPriority(priority);
            message->setCMSRedelivered(false);

            // transform to our own message format here
            commands::Message* transformed = NULL;
            Pointer<commands::Message> amqMessage;

            // Always assign the message ID, regardless of the disable flag.
            // Not adding a message ID will cause an NPE at the broker.
            decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId());
            id->setProducerId(producerId);
            id->setProducerSequenceId(sequenceId);

            // NOTE:
            // Now we copy the message before sending, this allows the user to reuse the
            // message object without interfering with the copy that's being sent.  We
            // could make this step optional to increase performance but for now we won't.
            // To not do this implies that the user must never reuse the message object, or
            // know that the configuration of Transports doesn't involve the message hanging
            // around beyond the point that send returns.  When the transform step results in
            // a new Message object being created we can just use that new instance, but when
            // the original cms::Message pointer was already a commands::Message then we need
            // to clone it.
            if (ActiveMQMessageTransformation::transformMessage(message, connection, &transformed)) {
                amqMessage.reset(transformed);
            } else {
                amqMessage.reset(transformed->cloneDataStructure());
            }

            // Sets the Message ID on the original message per spec.
            message->setCMSMessageID(id->toString());
            message->setCMSDestination(destination.dynamicCast<cms::Destination>().get());

            amqMessage->setMessageId(id);
            amqMessage->getBrokerPath().clear();
            amqMessage->setTransactionId(txId);
            amqMessage->setConnection(this->connection);

            // destination format is provider specific so only set on transformed message
            amqMessage->setDestination(destination);

            amqMessage->onSend();
            amqMessage->setProducerId(producerId);

            if (onComplete == NULL && sendTimeout <= 0 && !amqMessage->isResponseRequired() && !this->connection->isAlwaysSyncSend() &&
                (!amqMessage->isPersistent() || this->connection->isUseAsyncSend() || amqMessage->getTransactionId() != NULL)) {

                // No Response Required, send is asynchronous.
                this->connection->oneway(amqMessage);

                if (producerWindow != NULL) {
                    producerWindow->enqueueUsage(amqMessage->getSize());
                }

            } else {
                if (sendTimeout > 0 && onComplete == NULL) {
                    this->connection->syncRequest(amqMessage, (unsigned int)sendTimeout);
                } else {
                    this->connection->asyncRequest(amqMessage, onComplete);
                }
            }
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}