in jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java [336:408]
public void run() {
synchronized (lifecycleMonitor) {
activeInvokerCount++;
lifecycleMonitor.notifyAll();
}
updateRecoveryMarker();
boolean messageReceived = false;
try {
if (maxMessagesPerTask < 0) {
messageReceived = executeOngoingLoop();
}
else {
int messageCount = 0;
while (isRunning() && messageCount < maxMessagesPerTask) {
messageReceived = (invokeListener() || messageReceived);
messageCount++;
}
}
} catch (Exception ex) {
clearResources();
if (!lastMessageSucceeded) {
// We failed more than once in a row - sleep for recovery interval
// even before first recovery attempt.
sleepInbetweenRecoveryAttempts();
}
this.lastMessageSucceeded = false;
boolean alreadyRecovered = false;
synchronized (recoveryMonitor) {
if (this.lastRecoveryMarker == currentRecoveryMarker) {
handleListenerSetupFailure(ex, false);
recoverAfterListenerSetupFailure();
currentRecoveryMarker = new Object();
} else {
alreadyRecovered = true;
}
}
if (alreadyRecovered) {
handleListenerSetupFailure(ex, true);
}
}
synchronized (lifecycleMonitor) {
decreaseActiveInvokerCount();
lifecycleMonitor.notifyAll();
}
if (!messageReceived) {
this.idleTaskExecutionCount++;
} else {
this.idleTaskExecutionCount = 0;
}
synchronized (lifecycleMonitor) {
if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
// We're shutting down completely.
scheduledInvokers.remove(this);
if (logger.isDebugEnabled()) {
logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
}
lifecycleMonitor.notifyAll();
clearResources();
}
else if (isRunning()) {
int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
if (nonPausedConsumers < 1) {
logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
"Check your thread pool configuration! Manual recovery necessary through a start() call.");
}
else if (nonPausedConsumers < getConcurrentConsumers()) {
logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
"due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
"to be triggered by remaining consumers.");
}
}
}
}