in src/main/java/com/revo/deployr/client/broker/engine/RBrokerEngine.java [529:660]
protected abstract RBrokerWorker createBrokerWorker(RTask task,
long taskIndex,
boolean isPriorityTask,
Object resourceToken,
RBrokerEngine brokerEngine);
/*
* RBrokerEngine: private implementation.
*/
private class RBrokerWorkerManager implements Runnable {
public void run() {
try {
/*
* Signal to constructor that the brokerEngineExecutor
* thread is fully initialized and active, allowing
* the constructor to return safely to caller.
*/
engineInitialized.release();
while (taskBrokerIsActive.get()) {
RTask nextTaskInQueue = null;
/*
* Await next queued Task.
*/
boolean priorityTaskAvailable = false;
while (nextTaskInQueue == null &&
taskBrokerIsActive.get()) {
/*
* Retrieves but does not remove the task
* at the head of the queue.
*/
nextTaskInQueue = pendingHighPriorityQueue.peek();
if (nextTaskInQueue == null) {
nextTaskInQueue = pendingLowPriorityQueue.peek();
priorityTaskAvailable = false;
} else {
priorityTaskAvailable = true;
}
if (nextTaskInQueue == null) {
try {
/*
* Avoid busy-wait, sleep.
*/
Thread.currentThread().sleep(50);
} catch (Exception tex) {
}
} else {
/*
* Retrieves and removes the task
* at the head of the queue.
*/
nextTaskInQueue = priorityTaskAvailable ?
pendingHighPriorityQueue.take() :
pendingLowPriorityQueue.take();
}
}
/*
* If task found on queue and taskBroker
* is still active, process task.
*/
if (nextTaskInQueue != null &&
taskBrokerIsActive.get()) {
/*
* Await next available resource token in pool.
*/
Object resourceToken = resourceTokenPool.take();
boolean resourceTokenInUse = false;
try {
RBrokerWorker worker =
createBrokerWorker(nextTaskInQueue,
executorTaskCounter.getAndIncrement(),
priorityTaskAvailable,
resourceToken,
RBrokerEngine.this);
resourceTokenInUse = true;
taskResourceTokenMap.put(nextTaskInQueue, resourceToken);
Future future = taskWorkerExecutor.submit(worker);
RTaskTokenListener taskTokenListener =
taskTokenListenerMap.remove(nextTaskInQueue);
if (taskTokenListener != null) {
taskTokenListener.onTask(nextTaskInQueue, future);
} else {
System.out.println("RBrokerEngine: " +
"taskTokenListener callback not found for " +
nextTaskInQueue + ", unexpected error.");
}
} catch (Exception ex) {
if (!resourceTokenInUse && resourceToken != null) {
/*
* Return unused RProject instance
* to the tail of the project pool.
*/
resourceTokenPool.add(resourceToken);
}
System.out.println("RBrokerEngine: " +
" processing task " + nextTaskInQueue +
", ex=" + ex);
}
} // nextTaskInQueue != null
} // while taskBrokerIsActive
} catch (Exception mex) {
System.out.println("RBrokerEngine: " +
"brokerEngineExecutor.run ex=" + mex);
}
}
}