protected void processExchange()

in jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java [704:873]


    protected void processExchange(JmsRequestor requestor, Exchange exchange)  throws Exception {
        synchronized (requestor) {
            decrementPendingExchangeIfNeeded(exchange);
            boolean rollbackOnErrors;
            if (exchange.getRole() == Role.Consumer) {
                rollbackOnErrors = Boolean.TRUE.equals(exchange.getProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name));
            } else {
                rollbackOnErrors = this.rollbackOnErrors;
            }
            if (exchange.getStatus() == Status.Active) {
                Message msg = exchange.getFault(false);
                int type;
                if (msg != null) {
                    type = JBI_MESSAGE_FAULT;
                } else {
                    msg = exchange.getOut(false);
                    if (msg != null) {
                        type = JBI_MESSAGE_OUT;
                    } else {
                        msg = exchange.getIn(false);
                        if (msg != null) {
                            type = JBI_MESSAGE_IN;
                        } else {
                            throw new IllegalStateException("No normalized message on an active exchange: " + exchange);
                        }
                    }
                }
                javax.jms.Message message = requestor.getSession().createObjectMessage(msg);
                message.setIntProperty(JBI_MESSAGE, type);
                if (type == JBI_MESSAGE_IN) {
                    rollbackOnErrors = this.rollbackOnErrors;
                    exchange.setProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name, rollbackOnErrors);
                    message.setStringProperty(JBI_MEP, exchange.getPattern().getWsdlUri());
                    if (exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP) != null) {
                        message.setStringProperty(JBI_INTERFACE, exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP).toString());
                    }
                    if (exchange.getOperation() != null) {
                        message.setStringProperty(JBI_OPERATION, exchange.getOperation().toString());
                    }
                    if (exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP) != null) {
                        message.setStringProperty(JBI_SERVICE, exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP).toString());
                    }
                    if (exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP) != null) {
                        ServiceEndpoint se = (ServiceEndpoint) exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP);
                        message.setStringProperty(JBI_ENDPOINT, "{" + se.getServiceName().toString() + "}" + se.getEndpointName());
                    }
                    // TODO: write exchange properties
                }
                message.setBooleanProperty(PROPERTY_ROLLBACK_ON_ERRORS, rollbackOnErrors);
                boolean expectResponse;
                if (!rollbackOnErrors) {
                    expectResponse = true;
                } else {
                    switch (exchange.getPattern()) {
                        case InOnly:
                            expectResponse = false;
                            break;
                        case RobustInOnly:
                            expectResponse = exchange.getRole() == Role.Provider;
                            break;
                        case InOut:
                            expectResponse = exchange.getRole() == Role.Provider;
                            break;
                        default:
                            // TODO:
                            expectResponse = true;
                            break;
                    }
                }
                if (expectResponse) {
                    exchanges.put(exchange.getId(), exchange);
                    message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
                    message.setStringProperty(PROPERTY_SENDER_CORR_ID, exchange.getId());
                    if (requestor.getMessage() != null) {
                        message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
                        message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
                    }
                    requestor.send(message);
                } else {
                    message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
                    message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
                    if (requestor.getMessage() != null) {
                        message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
                        message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
                    }
                    requestor.send(message);
                    // TODO: send done in the tx synchronization
                    done(exchange);
                }
            } else if (exchange.getStatus() == Status.Done) {
                boolean doSend;
                if (!rollbackOnErrors) {
                    doSend = true;
                } else {
                    switch (exchange.getPattern()) {
                        case InOnly:
                            // never send done for InOnly
                            doSend = false;
                            break;
                        case RobustInOnly:
                            // only send done when there is no fault
                            // which means the exchange has a consumer role
                            doSend = exchange.getRole() == Role.Consumer;
                            break;
                        case InOptionalOut:
                            // TODO
                            doSend = true;
                            break;
                        case InOut:
                            // in an InOut mep, the DONE status always come from the JBI consumer
                            doSend = false;
                            break;
                        default:
                            throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
                    }
                }
                if (doSend) {
                    javax.jms.Message message = requestor.getSession().createMessage();
                    message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_DONE);
                    message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
                    message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
                    if (requestor.getMessage() != null) {
                        message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
                        message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
                    }
                    requestor.send(message);
                }
            } else if (exchange.getStatus() == Status.Error) {
                boolean doSend;
                if (!rollbackOnErrors) {
                    doSend = true;
                } else {
                    switch (exchange.getPattern()) {
                        case InOnly:
                            // never send errors for InOnly
                            doSend = false;
                            break;
                        case RobustInOnly:
                            // do not send exchange from the provider back to the consumer
                            doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
                            break;
                        case InOptionalOut:
                            // TODO
                            doSend = true;
                            break;
                        case InOut:
                            doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
                            break;
                        default:
                            throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
                    }
                }
                if (doSend) {
                    javax.jms.Message message = requestor.getSession().createObjectMessage(exchange.getError());
                    message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_ERROR);
                    message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
                    message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
                    if (requestor.getMessage() != null) {
                        message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
                        message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
                    }
                    requestor.send(message);
                } else {
                    requestor.setRollbackOnly();
                }
            } else {
                throw new IllegalStateException("Unknown exchange status: " + exchange);
            }
        }
    }