public void processInBound()

in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java [551:662]


    public void processInBound(MessageExchangeImpl me) throws MessagingException {
        LOGGER.trace("Processing inbound exchange: {}", me);
        // Check if the delivery channel has been closed
        checkNotClosed();
        // Retrieve the original exchange sent
        MessageExchangeImpl original = exchangesById.get(me.getKey());
        if (original != null && me != original) {
            original.copyFrom(me);
            me = original;
        }
        // Check if the incoming exchange is a response to a synchronous
        // exchange previously sent
        // In this case, we do not have to queue it, but rather notify the
        // waiting thread.
        if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
            // If the mirror has been delivered using push, better wait until
            // the push call return. This can only work if not using clustered
            // flows,
            // but the flag is transient so we do not care.
            // Ensure that data is uptodate with the incoming exchange (in
            // case the exchange has
            // been serialized / deserialized by a clustered flow)
            suspendTx(original);
            me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
            notifyExchange(original, original, "processInboundSynchronousExchange");
            return;
        }

        // If the component implements the MessageExchangeListener,
        // the delivery can be made synchronously, so we don't need
        // to bother with transactions
        MessageExchangeListener listener = getExchangeListener();
        if (listener != null && this.container.isOptimizedDelivery()) {
            me.handleAccept();
            LOGGER.trace("Received: {}", me);
            // Call input listeners
            ExchangeListener[] l = (ExchangeListener[]) container.getListeners(ExchangeListener.class);
            ExchangeEvent event = new ExchangeEvent(me, ExchangeEvent.EXCHANGE_ACCEPTED);
            for (int i = 0; i < l.length; i++) {
                try {
                    l[i].exchangeAccepted(event);
                } catch (Exception e) {
                    LOGGER.warn("Error calling listener: {}", e.getMessage(), e);
                }
            }
            // Set the flag the the exchange was delivered using push mode
            // This is important for transaction boundaries
            me.setPushDeliver(true);
            // Deliver the exchange
            ClassLoader old = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(component.getComponent().getClass().getClassLoader());
                listener.onMessageExchange(me);
            } finally {
                Thread.currentThread().setContextClassLoader(old);
            }
            // TODO: handle delayed exchange notifications
            return;
        }

        // Component uses pull delivery.

        // If the exchange is transacted, special care should be taken.
        // But if the exchange is no more ACTIVE, just queue it, as
        // we will never have an answer back.
        if (me.isTransacted() && me.getStatus() == ExchangeStatus.ACTIVE) {
            // If the transaction is conveyed by the exchange
            // We do not need to resume the transaction in this thread
            if (me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED) {
                try {
                    suspendTx(me);
                    queue.put(me);
                } catch (InterruptedException e) {
                    LOGGER.debug("Exchange {} aborted due to thread interruption", me.getExchangeId(), e);
                    me.getPacket().setAborted(true);
                }
            // Else the delivery / send are enlisted in the current tx.
            // We must suspend the transaction, queue it, and wait for the
            // answer
            // to be sent, at which time the tx should be suspended and resumed
            // in
            // this thread.
            } else {
                Object lock = new Object();
                synchronized (lock) {
                    try {
                        me.setTxLock(lock);
                        suspendTx(me);
                        queue.put(me);
                        waitForExchange(me, lock, 0, "processInboundTransactionalExchange");
                    } catch (InterruptedException e) {
                        LOGGER.debug("Exchange {} aborted due to thread interruption", me.getExchangeId(), e);
                        me.getPacket().setAborted(true);
                    } finally {
                        me.setTxLock(null);
                        resumeTx(me);
                    }
                }
            }
        // If the exchange is ACTIVE, the transaction boundary will suspended
        // when the
        // answer is sent
        // Else just queue the exchange
        } else {
            try {
                queue.put(me);
            } catch (InterruptedException e) {
                LOGGER.debug("Exchange {} aborted due to thread interruption", me.getExchangeId(), e);
                me.getPacket().setAborted(true);
            }
        }
    }