in bindings/servicemix-cxf-bc/src/main/java/org/apache/servicemix/cxfbc/CxfBcConsumer.java [791:917]
public void handleMessage(final Message message) throws Fault {
final Exchange cxfExchange = message.getExchange();
final Endpoint endpoint = cxfExchange.get(Endpoint.class);
final Service service = endpoint.getService();
final Invoker invoker = service.getInvoker();
if (invoker instanceof Servant) {
// it's rm request, run the invocation directly in bc, not send
// to se.
Exchange runableEx = message.getExchange();
Object result = invoker.invoke(runableEx, getInvokee(message));
if (!cxfExchange.isOneWay()) {
Endpoint end = cxfExchange.get(Endpoint.class);
Message outMessage = runableEx.getOutMessage();
if (outMessage == null) {
outMessage = end.getBinding().createMessage();
cxfExchange.setOutMessage(outMessage);
}
copyJaxwsProperties(message, outMessage);
if (result != null) {
MessageContentsList resList = null;
if (result instanceof MessageContentsList) {
resList = (MessageContentsList) result;
} else if (result instanceof List) {
resList = new MessageContentsList((List) result);
} else if (result.getClass().isArray()) {
resList = new MessageContentsList((Object[]) result);
} else {
outMessage.setContent(Object.class, result);
}
if (resList != null) {
outMessage.setContent(List.class, resList);
}
}
}
return;
}
MessageExchange exchange = message
.getContent(MessageExchange.class);
ComponentContext context = message.getExchange().get(
ComponentContext.class);
String unsubscribeAddress = (String)message.getContextualProperty(CxfBcConsumer.WSN_UNSUBSCRIBE_ADDRESS);
if (unsubscribeAddress != null && unsubscribeAddress.length() > 0) {
CxfBcConsumer.this.setTargetEndpoint(unsubscribeAddress);
}
CxfBcConsumer.this.configureExchangeTarget(exchange);
CxfBcConsumer.this.isOneway = message.getExchange().get(
BindingOperationInfo.class).getOperationInfo().isOneWay();
message.getExchange().setOneWay(CxfBcConsumer.this.isOneway);
try {
if (CxfBcConsumer.this.isOneway) {
CxfBcConsumer.this.messages.put(exchange.getExchangeId(), message);
context.getDeliveryChannel().send(exchange);
} else if ((CxfBcConsumer.this.isSynchronous()
&& !CxfBcConsumer.this.isOneway)
|| isServletTransport()) {
CxfBcConsumer.this.messages.put(exchange.getExchangeId(), message);
context.getDeliveryChannel().sendSync(exchange,
timeout * 1000);
process(exchange);
} else {
if (isNativeAsyn(message)) {
synchronized (message) {
if (!((PhaseInterceptorChain)message.getInterceptorChain()).getState()
.equals(State.PAUSED)) {
CxfBcConsumer.this.messages.put(exchange
.getExchangeId(), message);
context.getDeliveryChannel().send(exchange);
message.getInterceptorChain().pause();
} else {
// retry or timeout
if (!((PhaseInterceptorChain)message.getInterceptorChain()).getState()
.equals(State.EXECUTING)) {
messages.remove(exchange.getExchangeId());
// exchange timeout
throw new Exception("Exchange timed out: "
+ exchange.getExchangeId());
}
}
}
} else {
synchronized (((ContinuationProvider) message
.get(ContinuationProvider.class.getName()))
.getContinuation()) {
ContinuationProvider continuationProvider = (ContinuationProvider) message
.get(ContinuationProvider.class.getName());
Continuation continuation = continuationProvider
.getContinuation();
if (continuation.isNew()) {
continuation.suspend(timeout * 1000);
CxfBcConsumer.this.messages.put(exchange
.getExchangeId(), message);
context.getDeliveryChannel().send(exchange);
} else if (!continuation.isResumed()) {
if (!continuation.isPending()) {
messages.remove(exchange.getExchangeId());
continuation.reset();
// exchange timeout
throw new Exception("Exchange timed out: "
+ exchange.getExchangeId());
}
}
}
}
}
} catch (org.apache.cxf.continuations.SuspendedInvocationException e) {
throw e;
} catch (Exception e) {
throw new Fault(e);
}
}