in engines/servicemix-cxf-se/src/main/java/org/apache/servicemix/cxf/transport/jbi/JBIConduitOutputStream.java [87:204]
private void commitOutputMessage() throws IOException {
try {
Member member = (Member) message.get(Method.class.getName());
Class<?> clz = member.getDeclaringClass();
Exchange exchange = message.getExchange();
BindingOperationInfo bop = exchange.get(BindingOperationInfo.class);
LOG.info(new org.apache.cxf.common.i18n.Message("INVOKE.SERVICE", LOG).toString() + clz);
WebService ws = clz.getAnnotation(WebService.class);
assert ws != null;
QName interfaceName = new QName(ws.targetNamespace(), ws.name());
QName serviceName = null;
if (target != null) {
serviceName = EndpointReferenceUtils.getServiceName(target,
message.getExchange().get(Bus.class));
} else {
serviceName = message.getExchange().get(org.apache.cxf.service.Service.class).getName();
}
MessageExchangeFactory factory = channel.createExchangeFactoryForService(serviceName);
LOG.info(new org.apache.cxf.common.i18n.Message("CREATE.MESSAGE.EXCHANGE", LOG).toString()
+ serviceName);
MessageExchange xchng = null;
if (isOneWay) {
xchng = factory.createInOnlyExchange();
} else if (bop.getOutput() == null) {
xchng = factory.createRobustInOnlyExchange();
} else {
xchng = factory.createInOutExchange();
}
NormalizedMessage inMsg = xchng.createMessage();
LOG.info(new org.apache.cxf.common.i18n.Message("EXCHANGE.ENDPOINT", LOG).toString()
+ xchng.getEndpoint());
if (inMsg != null) {
LOG.info("setup message contents on " + inMsg);
inMsg.setContent(getMessageContent(message));
xchng.setService(serviceName);
LOG.info("service for exchange " + serviceName);
xchng.setInterfaceName(interfaceName);
xchng.setOperation(bop.getName());
//copy context
Map<String, Object> invocationContext =
CastUtils.cast((Map)message.get(Message.INVOCATION_CONTEXT));
if (invocationContext != null) {
for (Map.Entry<String, Object> ent
: CastUtils.cast((Map)invocationContext.get("RequestContext"),
String.class, Object.class).entrySet()) {
// check if value is Serializable, and if value is Map
// or collection,
// just exclude it since the entry of it may not be
// Serializable as well
if (ent.getValue() instanceof Serializable
&& !(ent.getValue() instanceof Map)
&& !(ent.getValue() instanceof Collection)) {
inMsg.setProperty(ent.getKey(), ent.getValue());
}
}
}
xchng.setMessage(inMsg, "in");
LOG.info("sending message");
if (!isOneWay) {
channel.sendSync(xchng);
NormalizedMessage outMsg = ((InOut)xchng).getOutMessage();
Source content = null;
Set normalizedMessageProps = null;
if (outMsg != null) {
content = outMsg.getContent();
normalizedMessageProps = outMsg.getPropertyNames();
} else {
if (((InOut)xchng).getFault() == null) {
throw xchng.getError();
}
content = ((InOut)xchng).getFault().getContent();
normalizedMessageProps = ((InOut)xchng).getFault().getPropertyNames();
}
Message inMessage = new MessageImpl();
message.getExchange().setInMessage(inMessage);
InputStream ins = JBIMessageHelper.convertMessageToInputStream(content);
if (ins == null) {
throw new IOException(new org.apache.cxf.common.i18n.Message(
"UNABLE.RETRIEVE.MESSAGE", LOG).toString());
}
inMessage.setContent(InputStream.class, ins);
inMessage.put(MessageExchange.class, xchng);
if (normalizedMessageProps != null) {
for (Object name : normalizedMessageProps) {
inMessage.put((String) name, outMsg
.getProperty((String) name));
}
}
conduit.getMessageObserver().onMessage(inMessage);
xchng.setStatus(ExchangeStatus.DONE);
channel.send(xchng);
} else {
channel.send(xchng);
}
} else {
LOG.info(new org.apache.cxf.common.i18n.Message("NO.MESSAGE", LOG).toString());
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.toString());
}
}