in src/main/java/org/apache/accumulo/testing/randomwalk/Module.java [199:383]
public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
int maxHops, maxSec;
boolean teardown;
Properties initProps = getProps("_init");
initProps.putAll(props);
String prop;
if ((prop = initProps.getProperty("maxHops")) == null || prop.equals("0") || prop.equals(""))
maxHops = Integer.MAX_VALUE;
else
maxHops = Integer.parseInt(initProps.getProperty("maxHops", "0"));
if ((prop = initProps.getProperty("maxSec")) == null || prop.equals("0") || prop.equals(""))
maxSec = Integer.MAX_VALUE;
else
maxSec = Integer.parseInt(initProps.getProperty("maxSec", "0"));
teardown = (prop = initProps.getProperty("teardown")) == null || prop.equals("true")
|| prop.equals("");
if (fixture != null) {
fixture.setUp(state, env);
}
ExecutorService service = ThreadPools.getServerThreadPools().getPoolBuilder("RandomWalk Runner")
.numCoreThreads(1).build();
try {
Node initNode = getNode(initNodeId);
boolean test = false;
if (initNode instanceof Test) {
startTimer(initNode);
test = true;
}
initNode.visit(state, env, getProps(initNodeId));
if (test)
stopTimer(initNode);
// update aliases
Set<String> aliases;
if ((aliases = aliasMap.get(initNodeId)) != null)
for (String alias : aliases) {
((Alias) nodes.get(alias)).update(initNodeId);
}
String curNodeId = initNodeId;
int numHops = 0;
long startTime = System.currentTimeMillis() / 1000;
while (true) {
// check if END state was reached
if (curNodeId.equalsIgnoreCase("END")) {
log.debug("reached END state");
break;
}
// check if maxSec was reached
long curTime = System.currentTimeMillis() / 1000;
if ((curTime - startTime) > maxSec) {
log.debug("reached maxSec(" + maxSec + ")");
break;
}
// The number of seconds before the test should exit
long secondsRemaining = maxSec - (curTime - startTime);
// check if maxHops was reached
if (numHops > maxHops) {
log.debug("reached maxHops(" + maxHops + ")");
break;
}
numHops++;
if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
throw new Exception(
"Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
}
AdjList adj = adjMap.get(curNodeId);
String nextNodeId = adj.randomNeighbor();
final Node nextNode;
Node nextNodeOrAlias = getNode(nextNodeId);
if (nextNodeOrAlias instanceof Alias) {
nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
nextNode = ((Alias) nextNodeOrAlias).get();
} else {
nextNode = nextNodeOrAlias;
}
final Properties nodeProps = getProps(nextNodeId);
try {
test = false;
if (nextNode instanceof Test) {
startTimer(nextNode);
test = true;
}
// Wrap the visit of the next node in the module in a
// callable that returns a thrown exception
FutureTask<Exception> task = new FutureTask<>(() -> {
try {
nextNode.visit(state, env, nodeProps);
return null;
} catch (Exception e) {
return e;
}
});
// Run the task (should execute immediately)
service.submit(task);
Exception nodeException;
try {
// Bound the time we'll wait for the node to complete
nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName()
+ " to complete. Exiting.", e);
break;
} catch (ExecutionException e) {
log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e);
throw e;
} catch (TimeoutException e) {
log.info("Timed out waiting for " + nextNode.getClass().getSimpleName()
+ " to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
break;
}
// The RandomWalk node throw an Exception that that Callable
// handed back
// Throw it and let the Module perform cleanup
if (null != nodeException) {
throw nodeException;
}
if (test)
stopTimer(nextNode);
} catch (Exception e) {
log.debug("AccumuloClient belongs to user: " + env.getAccumuloClient().whoami());
log.debug("Exception occured at: " + System.currentTimeMillis());
log.debug("Properties for node: " + nextNodeId);
for (Entry<Object,Object> entry : nodeProps.entrySet()) {
log.debug(" " + entry.getKey() + ": " + entry.getValue());
}
log.debug("Overall Configuration Properties");
for (Entry<Object,Object> entry : env.getTestProperties().entrySet()) {
log.debug(" " + entry.getKey() + ": " + entry.getValue());
}
log.debug("State information");
for (String key : new TreeSet<>(state.getMap().keySet())) {
Object value = state.getMap().get(key);
String logMsg = " " + key + ": ";
if (value == null)
logMsg += "null";
else if (value instanceof String || value instanceof Map || value instanceof Collection
|| value instanceof Number)
logMsg += value;
else if (value instanceof byte[])
logMsg += new String((byte[]) value, UTF_8);
else if (value instanceof PasswordToken)
logMsg += new String(((PasswordToken) value).getPassword(), UTF_8);
else
logMsg += value.getClass() + " - " + value;
log.debug(logMsg);
}
throw new Exception("Error running node " + nextNodeId, e);
}
// update aliases
if ((aliases = aliasMap.get(curNodeId)) != null)
for (String alias : aliases) {
((Alias) nodes.get(alias)).update(curNodeId);
}
curNodeId = nextNodeId;
}
} finally {
if (null != service) {
service.shutdownNow();
}
}
if (teardown && (fixture != null)) {
log.debug("tearing down module");
fixture.tearDown(state, env);
}
}