public void run()

in jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java [336:408]


        public void run() {
            synchronized (lifecycleMonitor) {
                activeInvokerCount++;
                lifecycleMonitor.notifyAll();
            }
            updateRecoveryMarker();
            boolean messageReceived = false;
            try {
                if (maxMessagesPerTask < 0) {
                    messageReceived = executeOngoingLoop();
                }
                else {
                    int messageCount = 0;
                    while (isRunning() && messageCount < maxMessagesPerTask) {
                        messageReceived = (invokeListener() || messageReceived);
                        messageCount++;
                    }
                }
            } catch (Exception ex) {
                clearResources();
                if (!lastMessageSucceeded) {
                    // We failed more than once in a row - sleep for recovery interval
                    // even before first recovery attempt.
                    sleepInbetweenRecoveryAttempts();
                }
                this.lastMessageSucceeded = false;
                boolean alreadyRecovered = false;
                synchronized (recoveryMonitor) {
                    if (this.lastRecoveryMarker == currentRecoveryMarker) {
                        handleListenerSetupFailure(ex, false);
                        recoverAfterListenerSetupFailure();
                        currentRecoveryMarker = new Object();
                    } else {
                        alreadyRecovered = true;
                    }
                }
                if (alreadyRecovered) {
                    handleListenerSetupFailure(ex, true);
                }
            }
            synchronized (lifecycleMonitor) {
                decreaseActiveInvokerCount();
                lifecycleMonitor.notifyAll();
            }
            if (!messageReceived) {
                this.idleTaskExecutionCount++;
            } else {
                this.idleTaskExecutionCount = 0;
            }
            synchronized (lifecycleMonitor) {
                if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
                    // We're shutting down completely.
                    scheduledInvokers.remove(this);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
                    }
                    lifecycleMonitor.notifyAll();
                    clearResources();
                }
                else if (isRunning()) {
                    int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
                    if (nonPausedConsumers < 1) {
                        logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
                                "Check your thread pool configuration! Manual recovery necessary through a start() call.");
                    }
                    else if (nonPausedConsumers < getConcurrentConsumers()) {
                        logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
                                "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
                                "to be triggered by remaining consumers.");
                    }
                }
            }
        }