in activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp [912:1014]
void ActiveMQSessionKernel::send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
cms::Message* message, int deliveryMode, int priority, long long timeToLive,
util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete) {
try {
this->checkClosed();
if (destination->isTemporary()) {
Pointer<ActiveMQTempDestination> tempDest = destination.dynamicCast<ActiveMQTempDestination>();
if (this->connection->isDeleted(tempDest)) {
throw cms::InvalidDestinationException(
std::string("Cannot publish to a deleted Destination: ") + destination->toString());
}
}
synchronized(&this->config->sendMutex) {
// Ensure that a new transaction is started if this is the first message
// sent since the last commit, Broker is notified of a new TX.
doStartTransaction();
Pointer<TransactionId> txId = this->transaction->getTransactionId();
Pointer<ProducerInfo> producerInfo = producer->getProducerInfo();
Pointer<ProducerId> producerId = producerInfo->getProducerId();
long long sequenceId = producer->getNextMessageSequence();
// Set the "CMS" header fields on the original message, see JMS 1.1 spec section 3.4.11
message->setCMSDeliveryMode(deliveryMode);
long long expiration = 0LL;
if (!producer->getDisableMessageTimeStamp()) {
long long timeStamp = System::currentTimeMillis();
message->setCMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message->setCMSExpiration(expiration);
message->setCMSPriority(priority);
message->setCMSRedelivered(false);
// transform to our own message format here
commands::Message* transformed = NULL;
Pointer<commands::Message> amqMessage;
// Always assign the message ID, regardless of the disable flag.
// Not adding a message ID will cause an NPE at the broker.
decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId());
id->setProducerId(producerId);
id->setProducerSequenceId(sequenceId);
// NOTE:
// Now we copy the message before sending, this allows the user to reuse the
// message object without interfering with the copy that's being sent. We
// could make this step optional to increase performance but for now we won't.
// To not do this implies that the user must never reuse the message object, or
// know that the configuration of Transports doesn't involve the message hanging
// around beyond the point that send returns. When the transform step results in
// a new Message object being created we can just use that new instance, but when
// the original cms::Message pointer was already a commands::Message then we need
// to clone it.
if (ActiveMQMessageTransformation::transformMessage(message, connection, &transformed)) {
amqMessage.reset(transformed);
} else {
amqMessage.reset(transformed->cloneDataStructure());
}
// Sets the Message ID on the original message per spec.
message->setCMSMessageID(id->toString());
message->setCMSDestination(destination.dynamicCast<cms::Destination>().get());
amqMessage->setMessageId(id);
amqMessage->getBrokerPath().clear();
amqMessage->setTransactionId(txId);
amqMessage->setConnection(this->connection);
// destination format is provider specific so only set on transformed message
amqMessage->setDestination(destination);
amqMessage->onSend();
amqMessage->setProducerId(producerId);
if (onComplete == NULL && sendTimeout <= 0 && !amqMessage->isResponseRequired() && !this->connection->isAlwaysSyncSend() &&
(!amqMessage->isPersistent() || this->connection->isUseAsyncSend() || amqMessage->getTransactionId() != NULL)) {
// No Response Required, send is asynchronous.
this->connection->oneway(amqMessage);
if (producerWindow != NULL) {
producerWindow->enqueueUsage(amqMessage->getSize());
}
} else {
if (sendTimeout > 0 && onComplete == NULL) {
this->connection->syncRequest(amqMessage, (unsigned int)sendTimeout);
} else {
this->connection->asyncRequest(amqMessage, onComplete);
}
}
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}