in bindings/servicemix-cxf-bc/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java [106:251]
public void onMessage(Message message) {
try {
// create Interceptor chain
PhaseManager pm = providerEndpoint.getBus().getExtension(
PhaseManager.class);
List<Interceptor<? extends Message>> inList = new ArrayList<Interceptor<? extends Message>>();
inList.add(new ReadHeadersInterceptor(this.providerEndpoint.getBus()));
inList.add(new StartBodyInterceptor());
inList.add(new MustUnderstandInterceptor());
inList.add(new StaxInInterceptor());
inList.add(new JbiInWsdl1Interceptor(this.providerEndpoint.isUseJBIWrapper(),
this.providerEndpoint.isUseSOAPEnvelope()));
if (this.providerEndpoint.isSchemaValidationEnabled()) {
inList.add(new SchemaValidationInInterceptor(this.providerEndpoint.isUseJBIWrapper(),
this.providerEndpoint.isUseSOAPEnvelope()));
}
inList.add(new AttachmentInInterceptor());
PhaseInterceptorChain inChain = new PhaseInterceptorChain(pm.getInPhases());
inChain.add(providerEndpoint.getBus().getInInterceptors());
inChain.add(inList);
inChain.add(providerEndpoint.getInInterceptors());
contentType = (String) message.get(Message.CONTENT_TYPE);
SoapMessage soapMessage =
(SoapMessage) this.providerEndpoint.getCxfEndpoint().getBinding().createMessage(message);
soapMessage
.put(org.apache.cxf.message.Message.REQUESTOR_ROLE, true);
soapMessage.setInterceptorChain(inChain);
MessageExchange messageExchange = soapMessage.getExchange().get(MessageExchange.class);
if (messageExchange == null) {
// probably, that's a WS-RM Response; use the messageObserver defined in exchange
MessageObserver messageObserver = message.getExchange().get(MessageObserver.class);
if (messageObserver != null) {
messageObserver.onMessage(message);
return;
} else {
//decoupled endpoint case we need try to restore the exchange first;
Exchange exchange = restoreExchange(soapMessage);
Node nd = soapMessage.getContent(Node.class);
if (nd instanceof Document) {
soapMessage.setContent(Node.class, ((Document)nd).getDocumentElement());
}
if (exchange != null) {
MessageObserver rmMessageObserver = exchange.get(MessageObserver.class);
if (rmMessageObserver != null) {
//means it createsequence messagee
sharedMessageObserver = rmMessageObserver;
rmMessageObserver.onMessage(soapMessage);
return;
}
} else {
//means it acknowlagement message
if (sharedMessageObserver != null) {
sharedMessageObserver.onMessage(soapMessage);
return;
}
}
}
}
if (messageExchange != null && messageExchange.getStatus() != ExchangeStatus.ACTIVE) {
return;
}
Node nd = soapMessage.getContent(Node.class);
if (nd instanceof Document) {
soapMessage.setContent(Node.class, ((Document)nd).getDocumentElement());
}
inChain.doIntercept(soapMessage);
closeConnectionStream(soapMessage);
if (soapMessage.getContent(Exception.class) != null || soapMessage.getContent(Source.class) == null) {
Exception ex = soapMessage.getContent(Exception.class);
if (!(soapMessage.getExchange().get(MessageExchange.class) instanceof InOnly) && ex != null) {
messageExchange.setStatus(ExchangeStatus.ERROR);
messageExchange.setError(ex);
providerEndpoint.getContext().getDeliveryChannel().send(
messageExchange);
}
return;
}
messageExchange = soapMessage.getExchange().get(MessageExchange.class);
if (MessageUtils.isPartialResponse(soapMessage)) {
//partial response for origianl channel when use decoupled endpoint
return;
}
if (soapMessage.getExchange().get(BindingOperationInfo.class).getOperationInfo().isOneWay()) {
messageExchange.setStatus(ExchangeStatus.DONE);
} else if (soapMessage.get("jbiFault") != null
&& soapMessage.get("jbiFault").equals(true)) {
Fault fault = messageExchange.createFault();
fault.setContent(soapMessage.getContent(Source.class));
messageExchange.setFault(fault);
if (soapMessage.get("faultstring") != null) {
messageExchange.setProperty("faultstring", soapMessage.get("faultstring"));
}
if (soapMessage.get("faultcode") != null) {
messageExchange.setProperty("faultcode", soapMessage.get("faultcode"));
}
if (soapMessage.get("hasdetail") != null) {
messageExchange.setProperty("hasdetail", soapMessage.get("hasdetail"));
}
} else if (messageExchange instanceof InOut) {
NormalizedMessage msg = messageExchange.createMessage();
msg.setContent(soapMessage.getContent(Source.class));
toNMSAttachments(msg, soapMessage);
messageExchange.setMessage(msg, "out");
} else if (messageExchange instanceof InOptionalOut) {
if (soapMessage.getContent(Source.class) != null) {
NormalizedMessage msg = messageExchange.createMessage();
msg.setContent(soapMessage.getContent(Source.class));
toNMSAttachments(msg, soapMessage);
messageExchange.setMessage(msg, "out");
} else {
messageExchange.setStatus(ExchangeStatus.DONE);
}
} else {
messageExchange.setStatus(ExchangeStatus.DONE);
}
boolean txSync = messageExchange.getStatus() == ExchangeStatus.ACTIVE
&& messageExchange.isTransacted()
&& Boolean.TRUE.equals(messageExchange
.getProperty(JbiConstants.SEND_SYNC));
if (txSync) {
providerEndpoint.getContext().getDeliveryChannel().sendSync(
messageExchange);
} else {
providerEndpoint.getContext().getDeliveryChannel().send(
messageExchange);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
synchronized (this) {
written = true;
notifyAll();
}
}
}