in modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportSender.java [115:261]
public void sendMessage(MessageContext msgCtx,
String targetEPR,
OutTransportInfo outTransportInfo) throws AxisFault {
AMQPSender amqpSender;
Integer hashKey = null;
Map<String, String> params = null;
String replyTo = null;
AMQPTransportProducerTx tx;
MessageContext replyMsgCtx = msgCtx.getOperationContext().getMessageContext(
WSDL2Constants.MESSAGE_LABEL_IN);
if (replyMsgCtx != null) {
replyTo = (String) replyMsgCtx.getProperty(AMQPTransportConstant.PROPERTY_AMQP_REPLY_TO);
}
if (replyTo != null) {
// this is a response for a request message(request/response semantic message)
hashKey = replyTo.hashCode();
params = new HashMap<String, String>();
params.put(AMQPTransportConstant.PARAMETER_QUEUE_NAME, replyTo);
String conFacName = (String) msgCtx.getOperationContext().
getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN).
getProperty(AMQPTransportConstant.RESPONSE_CONNECTION_FACTORY_NAME);
if (conFacName == null) {
throw new AxisFault("A message was received with 'reply to' set. But no reply " +
"connection factory name found. Define the parameter '" +
AMQPTransportConstant.PARAMETER_RESPONSE_CONNECTION_FACTORY_NAME +
"' as a service parameter. This response message will be dropped!");
} else {
params.put(AMQPTransportConstant.PARAMETER_CONNECTION_FACTORY_NAME, conFacName);
}
} else {
// this is a normal one way out message
if (targetEPR != null) {
hashKey = new Integer(targetEPR.hashCode());
try {
params = AMQPTransportUtils.parseAMQPUri(targetEPR);
} catch (AMQPTransportException e) {
throw new AxisFault("Error while parsing the AMQP epr '" + targetEPR + "'", e);
}
} else if (outTransportInfo != null && outTransportInfo instanceof AMQPOutTransportInfo) {
AMQPOutTransportInfo info = (AMQPOutTransportInfo) outTransportInfo;
params = info.getParams();
} else {
throw new AxisFault("Could not determine the endpoint information to deliver the message");
}
}
if (cache.hit(hashKey)) {
amqpSender = cache.get(hashKey);
} else {
try {
amqpSender = AMQPSenderFactory.createAMQPSender(connectionFactoryManager, params);
cache.add(hashKey, amqpSender);
} catch (IOException e) {
throw new AxisFault("Could not create the AMQP sender", e);
}
}
try {
String correlationId = (String)
msgCtx.getProperty(AMQPTransportConstant.PROPERTY_AMQP_CORRELATION_ID);
if (correlationId == null) {
correlationId = msgCtx.getMessageID();
}
boolean isInOut = waitForSynchronousResponse(msgCtx);
Semaphore available = null;
if (isInOut) {
replyTo = (String) msgCtx.getProperty(
AMQPTransportConstant.PROPERTY_AMQP_REPLY_TO);
if (replyTo == null) {
replyTo = UUID.randomUUID().toString();
}
available = new Semaphore(0, true);
responseTracker.put(correlationId, available);
}
String useTx = (String) msgCtx.getProperty(AMQPTransportConstant.PROPERTY_PRODUCER_TX);
if (AMQPTransportConstant.AMQP_USE_LWPC.equals(useTx)) {
tx = new AMQPTransportProducerTx(true, amqpSender.getChannel());
} else if (AMQPTransportConstant.AMQP_USE_TX.equals(useTx)) {
tx = new AMQPTransportProducerTx(false, amqpSender.getChannel());
} else {
tx = null;
}
if (tx != null) {
try {
tx.start();
} catch (IOException e) {
throw new AxisFault("Error while initiation tx for message '" +
msgCtx.getMessageID() + "'", e);
}
}
amqpSender.sendAMQPMessage(msgCtx, correlationId, replyTo);
if (tx != null) {
try {
tx.end();
} catch (IOException e) {
throw new AxisFault("Error while terminating tx for message '" +
msgCtx.getMessageID() + "'", e);
} catch (InterruptedException e) {
log.error("Error while terminating tx for message '" +
msgCtx.getMessageID() + "'", e);
Thread.currentThread().interrupt();
}
}
if (isInOut) {
// block and process the response
new AMQPSimpleConsumerTask(
responseHandlingPool,
amqpSender.getChannel(),
replyTo,
responseTracker,
responseMessage).
consume();
try {
available.tryAcquire(semaphoreTimeOut, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
responseTracker.remove(correlationId);
AMQPTransportMessage msg = responseMessage.get(correlationId);
if (msg != null) {
handleSyncResponse(msgCtx, msg, msg.getContentType());
} else {
// we don't have a response yet, so send a fault to client
log.warn("The semaphore with id '" + correlationId + "' was time out while "
+ "waiting for a response, sending a fault to client..");
sendFault(msgCtx,
new Exception("Times out occurs while waiting for a response"));
}
}
} catch (AMQPTransportException e) {
throw new AxisFault("Could not retrieve the connection factory information", e);
} catch (IOException e) {
throw new AxisFault("Could not produce the message into the destination", e);
}
}