in cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java [134:188]
public void process(Exchange exchange) {
if (exchange == null || exchange.getStatus() != Status.Active) {
return;
}
if (exchange.getPattern() == Pattern.InOnly || exchange.getPattern() == Pattern.RobustInOnly) {
exchange.setStatus(Status.Done);
getChannel().send(exchange);
}
QName opName = exchange.getOperation();
getLogger().fine("dispatch method: " + opName);
org.apache.servicemix.nmr.api.Message nm = exchange.getIn();
try {
MessageImpl inMessage = new MessageImpl();
inMessage.put(Exchange.class, exchange);
final InputStream in = NMRMessageHelper.convertMessageToInputStream(nm.getBody(Source.class));
inMessage.setContent(InputStream.class, in);
// copy attachments
Collection<Attachment> cxfAttachmentList = new ArrayList<Attachment>();
for (Map.Entry<String, Object> ent : nm.getAttachments().entrySet()) {
cxfAttachmentList.add(new AttachmentImpl(ent.getKey(), (DataHandler)ent.getValue()));
}
inMessage.setAttachments(cxfAttachmentList);
// copy properties and setup the cxf protocol header
Map<String, List<String>> protocolHeaders = new TreeMap<String, List<String>>(
String.CASE_INSENSITIVE_ORDER);
inMessage.put(Message.PROTOCOL_HEADERS, protocolHeaders);
for (Map.Entry<String, Object> ent : nm.getHeaders().entrySet()) {
if (!ent.getKey().equals(Message.REQUESTOR_ROLE)) {
inMessage.put(ent.getKey(), ent.getValue());
}
if (ent.getValue() instanceof String) {
List<String> value = new ArrayList<String>();
value.add((String)ent.getValue());
protocolHeaders.put(ent.getKey(), value);
}
}
// copy securitySubject
inMessage.put(NMRTransportFactory.NMR_SECURITY_SUBJECT, nm.getSecuritySubject());
inMessage.setDestination(this);
getMessageObserver().onMessage(inMessage);
} catch (Exception ex) {
getLogger().log(Level.SEVERE,
new org.apache.cxf.common.i18n.Message("ERROR.PREPARE.MESSAGE", getLogger())
.toString(), ex);
throw new ServiceMixException(ex);
}
}