protected void process()

in jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java [580:695]


    protected void process(JmsRequestor requestor) throws JMSException {
        javax.jms.Message message = requestor.getMessage();
        int type = message.getIntProperty(JBI_MESSAGE);
        switch (type) {
            case JBI_MESSAGE_DONE: {
                String corrId = message.getStringProperty(PROPERTY_CORR_ID);
                if (corrId == null) {
                    throw new IllegalStateException("Incoming JMS message has no correlationId");
                }
                Exchange exchange = exchanges.remove(corrId);
                if (exchange == null) {
                    throw new IllegalStateException("Exchange not found for id " + corrId);
                }
                done(exchange);
                break;
            }
            case JBI_MESSAGE_ERROR: {
                String corrId = message.getStringProperty(PROPERTY_CORR_ID);
                if (corrId == null) {
                    throw new IllegalStateException("Incoming JMS message has no correlationId");
                }
                Exchange exchange = exchanges.remove(corrId);
                if (exchange == null) {
                    throw new IllegalStateException("Exchange not found for id " + corrId);
                }
                fail(exchange, (Exception)((ObjectMessage) message).getObject());
                break;
            }
            case JBI_MESSAGE_IN: {
                String mep = message.getStringProperty(JBI_MEP);
                if (mep == null) {
                    throw new IllegalStateException("Exchange MEP not found for JMS message " + message.getJMSMessageID());
                }
                Exchange exchange = getChannel().createExchange(Pattern.fromWsdlUri(mep));
                exchange.setProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name, message.getBooleanProperty(PROPERTY_ROLLBACK_ON_ERRORS));
                if (message.propertyExists(JBI_INTERFACE)) {
                    exchange.setProperty(MessageExchangeImpl.INTERFACE_NAME_PROP, QName.valueOf(message.getStringProperty(JBI_INTERFACE)));
                }
                if (message.propertyExists(JBI_OPERATION)) {
                    exchange.setOperation(QName.valueOf(message.getStringProperty(JBI_OPERATION)));
                }
                if (message.propertyExists(JBI_SERVICE)) {
                    exchange.setProperty(MessageExchangeImpl.SERVICE_NAME_PROP, QName.valueOf(message.getStringProperty(JBI_SERVICE)));
                }
                if (message.propertyExists(JBI_ENDPOINT)) {
                    QName q = QName.valueOf(message.getStringProperty(JBI_ENDPOINT));
                    String e = q.getLocalPart();
                    q = QName.valueOf(q.getNamespaceURI());
                    ServiceEndpoint se = getEndpoint(q, e);
                    // TODO: check that endpoint exists
                    exchange.setProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP, se);
                }
                // Re-process JBI addressing
                DeliveryChannelImpl.createTarget(getChannel().getNMR(), exchange);
                // TODO: read exchange properties
                Message msg = unmarshallMessage(message);
                exchange.setIn(msg);
                exchanges.put(exchange.getId(), exchange);
                if (pendingExchanges.incrementAndGet() >= maxPendingExchanges) {
                    if (pauseConsumption.compareAndSet(false, true)) {
                        invalidateSelector();
                    }
                }
                exchange.setProperty(PROPERTY_CORR_ID + "." + name, exchange.getId());
                requestor.suspend(exchange.getId());
                if (requestor.getTransaction() != null) {
                    exchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, requestor.getTransaction());
                }
                send(exchange);
                break;
            }
            case JBI_MESSAGE_OUT: {
                String corrId = message.getStringProperty(PROPERTY_CORR_ID);
                if (corrId == null) {
                    throw new IllegalStateException("Incoming JMS message has no correlationId");
                }
                Exchange exchange = exchanges.get(corrId);
                if (exchange == null) {
                    throw new IllegalStateException("Exchange not found for id " + corrId);
                }
                Message msg = unmarshallMessage(message);
                exchange.setOut(msg);
                exchanges.put(exchange.getId(), exchange);
                exchange.setProperty(PROPERTY_CORR_ID + "." + name, exchange.getId());
                requestor.suspend(exchange.getId());
                if (requestor.getTransaction() != null) {
                    exchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, requestor.getTransaction());
                }
                send(exchange);
                break;
            }
            case JBI_MESSAGE_FAULT: {
                String corrId = message.getStringProperty(PROPERTY_CORR_ID);
                if (corrId == null) {
                    throw new IllegalStateException("Incoming JMS message has no correlationId");
                }
                Exchange exchange = exchanges.get(corrId);
                if (exchange == null) {
                    throw new IllegalStateException("Exchange not found for id " + corrId);
                }
                Message msg = unmarshallMessage(message);
                exchange.setFault(msg);
                exchanges.put(exchange.getId(), exchange);
                exchange.setProperty(PROPERTY_CORR_ID + "." + name, exchange.getId());
                requestor.suspend(exchange.getId());
                if (requestor.getTransaction() != null) {
                    exchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, requestor.getTransaction());
                }
                send(exchange);
                break;
            }
            default: {
                throw new IllegalStateException("Received unknown message type: " + type);
            }
        }
    }