in cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java [193:301]
private void commitOutputMessage() throws IOException {
try {
Member member = (Member) message.get(Method.class.getName());
Class<?> clz = member.getDeclaringClass();
Exchange exchange = message.getExchange();
BindingOperationInfo bop = exchange.get(BindingOperationInfo.class);
LOG.fine(new org.apache.cxf.common.i18n.Message("INVOKE.SERVICE", LOG).toString() + clz);
WebService ws = clz.getAnnotation(WebService.class);
assert ws != null;
QName interfaceName = new QName(ws.targetNamespace(), ws.name());
QName serviceName;
String address = null;
String portName = null;
if (target != null) {
serviceName = EndpointReferenceUtils.getServiceName(target, conduit.getBus());
address = EndpointReferenceUtils.getAddress(target);
portName = EndpointReferenceUtils.getPortName(target);
} else {
serviceName = message.getExchange().get(org.apache.cxf.service.Service.class).getName();
}
LOG.fine(new org.apache.cxf.common.i18n.Message("CREATE.MESSAGE.EXCHANGE", LOG).toString() + serviceName);
org.apache.servicemix.nmr.api.Exchange xchng;
if (isOneWay) {
xchng = channel.createExchange(Pattern.InOnly);
} else if (bop.getOutput() == null) {
xchng = channel.createExchange(Pattern.RobustInOnly);
} else {
xchng = channel.createExchange(Pattern.InOut);
}
org.apache.servicemix.nmr.api.Message inMsg = xchng.getIn();
LOG.fine(new org.apache.cxf.common.i18n.Message("EXCHANGE.ENDPOINT", LOG).toString() + serviceName);
LOG.fine("setup message contents on " + inMsg);
inMsg.setBody(getMessageContent(message));
//copy attachments
if (message != null && message.getAttachments() != null) {
for (Attachment att : message.getAttachments()) {
inMsg.addAttachment(att.getId(), att
.getDataHandler());
}
}
//copy properties
for (Map.Entry<String, Object> ent : message.entrySet()) {
//check if value is Serializable, and if value is Map or collection,
//just exclude it since the entry of it may not be Serializable as well
if (ent.getValue() instanceof Serializable
&& !(ent.getValue() instanceof Map)
&& !(ent.getValue() instanceof Collection)) {
inMsg.setHeader(ent.getKey(), ent.getValue());
}
}
// we need to copy the protocol headers
Map<String, List<String>> protocolHeaders = (Map<String, List<String>>)message.get(Message.PROTOCOL_HEADERS);
if (protocolHeaders != null) {
for (Map.Entry<String, List<String>> ent : protocolHeaders.entrySet()) {
if (ent.getValue().size() > 0) {
// here we just put the first header value
inMsg.setHeader(ent.getKey(), ent.getValue().get(0));
}
}
}
//copy securitySubject
inMsg.setSecuritySubject((Subject) message.get(NMRTransportFactory.NMR_SECURITY_SUBJECT));
LOG.fine("service for exchange " + serviceName);
Map<String,Object> refProps = new HashMap<String,Object>();
if (interfaceName != null) {
refProps.put(Endpoint.INTERFACE_NAME, interfaceName.toString());
}
if (serviceName != null) {
refProps.put(Endpoint.SERVICE_NAME, serviceName.toString());
}
if (address != null && address.startsWith("nmr:")) {
if (address.indexOf("?") > 0) {
refProps.put(Endpoint.NAME, address.substring(4, address.indexOf("?")));
} else {
refProps.put(Endpoint.NAME, address.substring(4));
}
} else {
if (portName != null) {
refProps.put(Endpoint.NAME, portName);
}
}
Reference ref = channel.getNMR().getEndpointRegistry().lookup(refProps);
xchng.setTarget(ref);
xchng.setOperation(bop.getName());
LOG.fine("sending message");
if (message.getExchange().isSynchronous()) {
syncInvoke(xchng);
} else {
asynInvokeWithWorkQueue(xchng);
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
LOG.log(Level.SEVERE, e.getMessage(), e);
throw new IOException(e.toString());
}
}