in src/main/java/com/revo/deployr/client/broker/engine/RBrokerEngine.java [260:337]
public RTaskToken submit(RTask task, boolean priority)
throws RBrokerException {
if (refreshingConfig.get()) {
throw new RBrokerException("RTask submissions temporarily " +
"disabled while RBroker configuration refreshes.");
}
try {
/*
* We clone the incoming RTask into a new instance of
* RTask to ensure RTask is unique so it can be used
* as a key inside the taskTokenListenerMap.
*
* How could the value of RTask param not be unique?
*
* For example, RBrokerAppSimulator apps frequently
* create a single RTask instance and submit it many times
* to simulate load.
*/
RTask clonedTask = cloneTask(task);
/*
* Prepare setup for RTaskToken.
*/
RTaskToken rTaskToken = new RTaskTokenImpl(task);
/*
* Appending associated RTaskToken to end of
* liveTaskTokens in preparation for pushing
* RTask onto pending[High,Low]PriorityQueue.
*/
liveTaskTokens.add(rTaskToken);
/*
* Register RTask and associated RTaskToken here.
* Once RTask has been submitted to Executor and
* Future for task exists, we will make an
* RTaskToken.onTask callback to register Future
* on token.
*/
taskTokenListenerMap.put(clonedTask, rTaskToken);
boolean added = false;
if (priority)
added = pendingHighPriorityQueue.offer(clonedTask);
else
added = pendingLowPriorityQueue.offer(clonedTask);
if (!added) {
/*
* Undo (above) setup for RTaskToken that was
* rejected by pending[High,Low]PriorityQueues.
*/
boolean liveTaskTokenRemoved =
liveTaskTokens.remove(rTaskToken);
Object clonedTaskRemoved =
taskTokenListenerMap.remove(clonedTask);
if(!liveTaskTokenRemoved || clonedTaskRemoved == null) {
System.out.println("RBrokerEngine: " +
"Broker at capacity, rTask rejected, cleanup: " +
liveTaskTokenRemoved + ", " + clonedTaskRemoved);
}
throw new RBrokerException("Broker at capacity ( " +
MAX_TASK_QUEUE_SIZE + " ), rejecting task " + clonedTask);
}
return rTaskToken;
} catch (Exception rex) {
throw new RBrokerException("RBroker: " +
"submit failed, cause: " + rex.getMessage(), rex);
}
}