in modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java [77:266]
public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
log.debug("JMSSender invoke()");
/* Added due to possible bug in Axis2, MTOM enablement is based on msgContext.isDoingMTOM
* However msgContext.isDoingMTOM will always return false unless set programmatically.
* HTTP sets this boolean programmatically by looking up whether enableMTOM has been set
* in axis2.xml or as an option on the client.
*/
msgContext.setDoingMTOM(HTTPTransportUtils.doWriteMTOM(msgContext));
JMSOutTransportInfo transportInfo = null;
String targetAddress = null;
// is there a transport url? which may be different from the WS-A To..
targetAddress = (String) msgContext.getProperty(
Constants.Configuration.TRANSPORT_URL);
if (targetAddress != null) {
transportInfo = new JMSOutTransportInfo(targetAddress);
} else if (targetAddress == null && msgContext.getTo() != null &&
!msgContext.getTo().hasAnonymousAddress()) {
targetAddress = msgContext.getTo().getAddress();
if (!msgContext.getTo().hasNoneAddress()) {
transportInfo = new JMSOutTransportInfo(targetAddress);
} else {
//Don't send the message.
return InvocationResponse.CONTINUE;
}
} else if (msgContext.isServerSide()) {
// get the jms ReplyTo
transportInfo = (JMSOutTransportInfo)
msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
}
// get the ConnectionFactory to be used for the send
ConnectionFactory connectionFac = transportInfo.getConnectionFactory();
Connection con = null;
try {
String user = transportInfo.getConnectionFactoryUser();
String password = transportInfo.getConnectionFactoryPassword();
if ((user == null) || (password == null)){
// Use the OS username and credentials
con = connectionFac.createConnection();
} else{
// use an explicit username and password
con = connectionFac.createConnection(user, password);
}
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Message message = createJMSMessage(msgContext, session);
// get the JMS destination for the message being sent
Destination dest = transportInfo.getDestination();
if (dest == null) {
if (targetAddress != null) {
// if it does not exist, create it
String name = JMSUtils.getDestination(targetAddress);
if (log.isDebugEnabled()) {
log.debug("Creating JMS Destination : " + name);
}
try {
dest = session.createQueue(name);
} catch (JMSException e) {
handleException("Error creating destination Queue : " + name, e);
}
} else {
handleException("Cannot send reply to unknown JMS Destination");
}
}
MessageProducer producer = session.createProducer(dest);
Destination replyDest = null;
boolean waitForResponse =
msgContext.getOperationContext() != null &&
WSDL2Constants.MEP_URI_OUT_IN.equals(
msgContext.getOperationContext().getAxisOperation().getMessageExchangePattern());
if (waitForResponse) {
String replyToJNDIName = (String) msgContext.getProperty(JMSConstants.REPLY_PARAM);
if (replyToJNDIName != null && replyToJNDIName.length() > 0) {
Context context = null;
final Hashtable props = JMSUtils.getProperties(targetAddress);
try {
try {
context = (Context) AccessController.doPrivileged(
new PrivilegedExceptionAction() {
public Object run() throws NamingException{
return new InitialContext(props);
}
}
)
;
} catch (PrivilegedActionException e) {
throw (NamingException) e.getException();
}
} catch (NamingException e) {
handleException("Could not get the initial context", e);
}
try {
replyDest = (Destination) context.lookup(replyToJNDIName);
} catch (NameNotFoundException e) {
log.warn("Cannot get or lookup JMS response destination : " +
replyToJNDIName + " : " + e.getMessage() +
". Attempting to create a Queue named : " + replyToJNDIName);
replyDest = session.createQueue(replyToJNDIName);
} catch (NamingException e) {
handleException("Cannot get JMS response destination : " +
replyToJNDIName + " : ", e);
}
} else {
try {
// create temporary queue to receive reply
replyDest = session.createTemporaryQueue();
} catch (JMSException e) {
handleException("Error creating temporary queue for response");
}
}
message.setJMSReplyTo(replyDest);
if (log.isDebugEnabled()) {
log.debug("Expecting a response to JMS Destination : " +
(replyDest instanceof Queue ?
((Queue) replyDest).getQueueName() : ((Topic) replyDest).getTopicName()));
}
}
try {
log.debug("[" + (msgContext.isServerSide() ? "Server" : "Client") +
"]Sending message to destination : " + dest);
producer.send(message);
producer.close();
} catch (JMSException e) {
handleException("Error sending JMS message to destination : " +
dest.toString(), e);
}
if (waitForResponse) {
try {
// wait for reply
MessageConsumer consumer = session.createConsumer(replyDest);
long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
Long waitReply = (Long) msgContext.getProperty(JMSConstants.JMS_WAIT_REPLY);
if (waitReply != null) {
timeout = waitReply.longValue();
}
log.debug("Waiting for a maximum of " + timeout +
"ms for a response message to destination : " + replyDest);
con.start();
Message reply = consumer.receive(timeout);
if (reply != null) {
msgContext.setProperty(MessageContext.TRANSPORT_IN,
JMSUtils.getInputStream(reply));
} else {
log.warn("Did not receive a JMS response within " +
timeout + " ms to destination : " + dest);
}
} catch (JMSException e) {
handleException("Error reading response from temporary " +
"queue : " + replyDest, e);
}
}
} catch (JMSException e) {
handleException("Error preparing to send message to destination", e);
} finally {
if (con != null) {
try {
con.close(); // closes all sessions, producers, temp Q's etc
} catch (JMSException e) {
} // ignore
}
}
return InvocationResponse.CONTINUE;
}