public void handleMessage()

in bindings/servicemix-cxf-bc/src/main/java/org/apache/servicemix/cxfbc/CxfBcConsumer.java [791:917]


        public void handleMessage(final Message message) throws Fault {

            final Exchange cxfExchange = message.getExchange();
            final Endpoint endpoint = cxfExchange.get(Endpoint.class);
            final Service service = endpoint.getService();
            final Invoker invoker = service.getInvoker();

            
            
            if (invoker instanceof Servant) {
                // it's rm request, run the invocation directly in bc, not send
                // to se.

                Exchange runableEx = message.getExchange();

                Object result = invoker.invoke(runableEx, getInvokee(message));
                if (!cxfExchange.isOneWay()) {
                    Endpoint end = cxfExchange.get(Endpoint.class);

                    Message outMessage = runableEx.getOutMessage();
                    if (outMessage == null) {
                        outMessage = end.getBinding().createMessage();
                        cxfExchange.setOutMessage(outMessage);
                    }
                    copyJaxwsProperties(message, outMessage);
                    if (result != null) {
                        MessageContentsList resList = null;
                        if (result instanceof MessageContentsList) {
                            resList = (MessageContentsList) result;
                        } else if (result instanceof List) {
                            resList = new MessageContentsList((List) result);
                        } else if (result.getClass().isArray()) {
                            resList = new MessageContentsList((Object[]) result);
                        } else {
                            outMessage.setContent(Object.class, result);
                        }
                        if (resList != null) {
                            outMessage.setContent(List.class, resList);
                        }
                    }
                }

                return;
            }

            MessageExchange exchange = message
                    .getContent(MessageExchange.class);
            ComponentContext context = message.getExchange().get(
                    ComponentContext.class);
            
            String unsubscribeAddress = (String)message.getContextualProperty(CxfBcConsumer.WSN_UNSUBSCRIBE_ADDRESS);
            if (unsubscribeAddress != null && unsubscribeAddress.length() > 0) {
                CxfBcConsumer.this.setTargetEndpoint(unsubscribeAddress);
            }
            
            CxfBcConsumer.this.configureExchangeTarget(exchange);
            CxfBcConsumer.this.isOneway = message.getExchange().get(
                    BindingOperationInfo.class).getOperationInfo().isOneWay();
            message.getExchange().setOneWay(CxfBcConsumer.this.isOneway);
            

            try {
            	if (CxfBcConsumer.this.isOneway) {
                        CxfBcConsumer.this.messages.put(exchange.getExchangeId(), message);
            		context.getDeliveryChannel().send(exchange);
            	} else if ((CxfBcConsumer.this.isSynchronous()
                        && !CxfBcConsumer.this.isOneway)
                        || isServletTransport()) {
                    CxfBcConsumer.this.messages.put(exchange.getExchangeId(), message);
                    context.getDeliveryChannel().sendSync(exchange,
                            timeout * 1000);
                    process(exchange);
                } else {
                    if (isNativeAsyn(message)) {
                        synchronized (message) {

                            if (!((PhaseInterceptorChain)message.getInterceptorChain()).getState()
                                    .equals(State.PAUSED)) {
                                CxfBcConsumer.this.messages.put(exchange
                                        .getExchangeId(), message);
                                context.getDeliveryChannel().send(exchange);
                                message.getInterceptorChain().pause();
                            } else {
                                // retry or timeout
                                if (!((PhaseInterceptorChain)message.getInterceptorChain()).getState()
                                        .equals(State.EXECUTING)) {
                                    messages.remove(exchange.getExchangeId());
                                    // exchange timeout
                                    throw new Exception("Exchange timed out: "
                                            + exchange.getExchangeId());
                                }

                            }

                        }
                    } else {
                        synchronized (((ContinuationProvider) message
                                .get(ContinuationProvider.class.getName()))
                                .getContinuation()) {

                            ContinuationProvider continuationProvider = (ContinuationProvider) message
                                    .get(ContinuationProvider.class.getName());
                            Continuation continuation = continuationProvider
                                    .getContinuation();
                            if (continuation.isNew()) {
                            	continuation.suspend(timeout * 1000);
                                CxfBcConsumer.this.messages.put(exchange
                                        .getExchangeId(), message);
                                context.getDeliveryChannel().send(exchange);
                            } else if (!continuation.isResumed()) {
                                if (!continuation.isPending()) {
                                    messages.remove(exchange.getExchangeId());
                                    continuation.reset();
                                    // exchange timeout
                                    throw new Exception("Exchange timed out: "
                                            + exchange.getExchangeId());
                                }
                            }
                        }
                    }
                }
            } catch (org.apache.cxf.continuations.SuspendedInvocationException e) {
                throw e;
            } catch (Exception e) {
                throw new Fault(e);
            }
        }