protected void pollDeliveryChannel()

in shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java [334:434]


    protected void pollDeliveryChannel() {
        synchronized (polling) {
            polling.set(true);
            polling.notify();
        }
        Executor executor = null;
        ExchangeStatus oldStatus = null;
        MessageExchange newExchange = null;
        while (running.get()) {
            try {
                final MessageExchange exchange = channel.accept(1000L);
                
                if (exchange != null) {
                    newExchange = exchange;
                    oldStatus = exchange.getStatus();
                    executor = exchange.getRole().equals(Role.CONSUMER) ? consumerExecutor : providerExecutor;
                    final Transaction tx = (Transaction) exchange
                            .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
                    if (tx != null && container.handleTransactions()) {
                        if (transactionManager == null) {
                            throw new IllegalStateException(
                                    "Exchange is enlisted in a transaction, but no transaction manager is available");
                        }
                        transactionManager.suspend();
                    }
                    executor.execute(new Runnable() {
                        public void run() {
                            if (tx != null) {
                                processExchangeInTx(exchange, tx);
                            } else {
                                processExchangeWithoutTx(exchange);
                            }
                        }
                    });
                }
            } catch (Throwable t) {
                if (running.get() == false) {
                    // Should have been interrupted, discard the throwable
                    logger.debug("Polling thread will stop");
                } else {
                    logger.error("Error polling delivery channel", t);
                }
                try {
                    // If we are transacted, check if this exception should
                    // rollback the transaction
                    if (transactionManager != null && transactionManager.getStatus() == Status.STATUS_ACTIVE) {
                        if (exceptionShouldRollbackTx(t)) {
                            transactionManager.setRollbackOnly();
                        }
                        if (!container.handleTransactions()) {
                            transactionManager.suspend();
                        }
                    }
                    if (oldStatus == ExchangeStatus.ACTIVE) {
                        newExchange.setStatus(ExchangeStatus.ERROR);
                        if (t instanceof RejectedExecutionException) {
                            if (t.getMessage() == null || t.getMessage().length() == 0) {
                                t = new RuntimeException(
                                                         "Task rejected from java.util.concurrent.ThreadPoolExecutor, need bigger ThreadPool",
                                                         t);
                            }
                        }
                        newExchange.setError(t instanceof Exception ? (Exception)t : new Exception(t));
                        channel.send(newExchange);
                    }
                } catch (Exception inner) {
                    logger.error("Error setting exchange status to ERROR", inner);
                }
            } finally {
                try {
                    // Check transaction status
                    if (newExchange != null) {
                        Transaction tx = (Transaction)newExchange
                            .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
                        if (tx != null) {
                            int status = transactionManager.getStatus();
                            // We use pull delivery, so the transaction should already
                            // have been transfered to another thread because the
                            // component
                            // must have answered.
                            if (status != Status.STATUS_NO_TRANSACTION) {
                                logger
                                    .error("Transaction is still active after exchange processing. Trying to rollback transaction.");
                                try {
                                    transactionManager.rollback();
                                } catch (Throwable t) {
                                    logger.error("Error trying to rollback transaction.", t);
                                }
                            }
                        }
                    }
                } catch (Throwable t) {
                    logger.error("Error checking transaction status.", t);
                }
            }
        }
        synchronized (polling) {
            polling.set(false);
            polling.notify();
        }
    }