in src/main/java/com/revo/deployr/client/broker/engine/PooledTaskBroker.java [142:244]
public void callback(RTask task, RTaskResult result) {
RProject rProject = (RProject) taskResourceTokenMap.remove(task);
/*
* PooledTaskBroker DeployR Grid Fault Tolerance Handling.
*
* An RGridException indicates that an RProject (R session)
* is no longer reachable due to a slot or node failure on
* the DeployR grid.
*
* An RSecurityException with an error code of 403 indicates
* that an RProject (R session) is no longer reachable due to an
* unexpected R session termination on the DeployR grid, perhaps
* caused by the admin forcibly shutting it down.
*/
Exception failure = result.getFailure();
if (failure instanceof RGridException ||
(failure instanceof RSecurityException &&
((RSecurityException) failure).errorCode == 403)) {
/*
* On detection of an RGridException drop the RProject from
* the pool so further tasks are not directed to that RProject.
* We achieve this by simply not adding the RProject back to the
* resourceTokenPool on this callback.
*
* We then need to adjust the parallelTaskLimit so the RBroker
* will report the new (smaller) pool size on
* RBroker.maxConcurrency() calls.
*/
if (taskListener != null) {
/*
* When asynchronous listener in use, failed task
* executions due to slot or grid failures can be
* automatically resubmitted for execution by the RBroker.
*
* When RTaskResult.repeatTask is enabled the
* RBrokerEngine.RBrokerListenerManager will skip
* calling taskListener.onTaskCompleted(task, result).
* This prevents a client application from seeing
* (or having to handle) temporary slot or grid related
* failures on RTasks.
*/
RTaskResultImpl resultImpl = (RTaskResultImpl) result;
resultImpl.repeatTask = true;
/*
* Now re-submit for execution using the priority
* queue to expedite processing.
*/
try {
submit(task, true);
} catch (Exception tex) {
System.out.println("PooledTaskBroker: " +
"callback, task re-submission ex=" + tex);
}
}
int resizedPoolSize = parallelTaskLimit.decrementAndGet();
if (brokerListener != null) {
RBrokerException rbex;
if (resizedPoolSize == 0) {
rbex = new RBrokerException("DeployR grid " +
"failure detected, pool no longer operational, " +
"advise RBroker shutdown.");
} else {
rbex = new RBrokerException("DeployR grid " +
"failure detected, pool size auto-adjusted, max " +
" concurrency now " +
resizedPoolSize + ".");
}
brokerListener.onRuntimeError(rbex);
}
} else {
/*
* On success or on non-RGridException failures then return
* the RProject to the pool for use by pending/future tasks.
*/
if (rProject != null) {
boolean added = resourceTokenPool.add(rProject);
if (!added) {
System.out.println("PooledTaskBroker: " +
"callback, project could not be added " +
"back to pool?");
}
} else {
System.out.println("PooledTaskBroker: " +
"callback, task does not have " +
"matching project?");
}
}
}