in shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java [334:434]
protected void pollDeliveryChannel() {
synchronized (polling) {
polling.set(true);
polling.notify();
}
Executor executor = null;
ExchangeStatus oldStatus = null;
MessageExchange newExchange = null;
while (running.get()) {
try {
final MessageExchange exchange = channel.accept(1000L);
if (exchange != null) {
newExchange = exchange;
oldStatus = exchange.getStatus();
executor = exchange.getRole().equals(Role.CONSUMER) ? consumerExecutor : providerExecutor;
final Transaction tx = (Transaction) exchange
.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (tx != null && container.handleTransactions()) {
if (transactionManager == null) {
throw new IllegalStateException(
"Exchange is enlisted in a transaction, but no transaction manager is available");
}
transactionManager.suspend();
}
executor.execute(new Runnable() {
public void run() {
if (tx != null) {
processExchangeInTx(exchange, tx);
} else {
processExchangeWithoutTx(exchange);
}
}
});
}
} catch (Throwable t) {
if (running.get() == false) {
// Should have been interrupted, discard the throwable
logger.debug("Polling thread will stop");
} else {
logger.error("Error polling delivery channel", t);
}
try {
// If we are transacted, check if this exception should
// rollback the transaction
if (transactionManager != null && transactionManager.getStatus() == Status.STATUS_ACTIVE) {
if (exceptionShouldRollbackTx(t)) {
transactionManager.setRollbackOnly();
}
if (!container.handleTransactions()) {
transactionManager.suspend();
}
}
if (oldStatus == ExchangeStatus.ACTIVE) {
newExchange.setStatus(ExchangeStatus.ERROR);
if (t instanceof RejectedExecutionException) {
if (t.getMessage() == null || t.getMessage().length() == 0) {
t = new RuntimeException(
"Task rejected from java.util.concurrent.ThreadPoolExecutor, need bigger ThreadPool",
t);
}
}
newExchange.setError(t instanceof Exception ? (Exception)t : new Exception(t));
channel.send(newExchange);
}
} catch (Exception inner) {
logger.error("Error setting exchange status to ERROR", inner);
}
} finally {
try {
// Check transaction status
if (newExchange != null) {
Transaction tx = (Transaction)newExchange
.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (tx != null) {
int status = transactionManager.getStatus();
// We use pull delivery, so the transaction should already
// have been transfered to another thread because the
// component
// must have answered.
if (status != Status.STATUS_NO_TRANSACTION) {
logger
.error("Transaction is still active after exchange processing. Trying to rollback transaction.");
try {
transactionManager.rollback();
} catch (Throwable t) {
logger.error("Error trying to rollback transaction.", t);
}
}
}
}
} catch (Throwable t) {
logger.error("Error checking transaction status.", t);
}
}
}
synchronized (polling) {
polling.set(false);
polling.notify();
}
}