public void visit()

in src/main/java/org/apache/accumulo/testing/randomwalk/Module.java [199:383]


  public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
    int maxHops, maxSec;
    boolean teardown;

    Properties initProps = getProps("_init");
    initProps.putAll(props);
    String prop;
    if ((prop = initProps.getProperty("maxHops")) == null || prop.equals("0") || prop.equals(""))
      maxHops = Integer.MAX_VALUE;
    else
      maxHops = Integer.parseInt(initProps.getProperty("maxHops", "0"));

    if ((prop = initProps.getProperty("maxSec")) == null || prop.equals("0") || prop.equals(""))
      maxSec = Integer.MAX_VALUE;
    else
      maxSec = Integer.parseInt(initProps.getProperty("maxSec", "0"));

    teardown = (prop = initProps.getProperty("teardown")) == null || prop.equals("true")
        || prop.equals("");

    if (fixture != null) {
      fixture.setUp(state, env);
    }

    ExecutorService service = ThreadPools.getServerThreadPools().getPoolBuilder("RandomWalk Runner")
        .numCoreThreads(1).build();

    try {
      Node initNode = getNode(initNodeId);

      boolean test = false;
      if (initNode instanceof Test) {
        startTimer(initNode);
        test = true;
      }
      initNode.visit(state, env, getProps(initNodeId));
      if (test)
        stopTimer(initNode);

      // update aliases
      Set<String> aliases;
      if ((aliases = aliasMap.get(initNodeId)) != null)
        for (String alias : aliases) {
          ((Alias) nodes.get(alias)).update(initNodeId);
        }

      String curNodeId = initNodeId;
      int numHops = 0;
      long startTime = System.currentTimeMillis() / 1000;
      while (true) {
        // check if END state was reached
        if (curNodeId.equalsIgnoreCase("END")) {
          log.debug("reached END state");
          break;
        }
        // check if maxSec was reached
        long curTime = System.currentTimeMillis() / 1000;
        if ((curTime - startTime) > maxSec) {
          log.debug("reached maxSec(" + maxSec + ")");
          break;
        }

        // The number of seconds before the test should exit
        long secondsRemaining = maxSec - (curTime - startTime);

        // check if maxHops was reached
        if (numHops > maxHops) {
          log.debug("reached maxHops(" + maxHops + ")");
          break;
        }
        numHops++;

        if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
          throw new Exception(
              "Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
        }
        AdjList adj = adjMap.get(curNodeId);
        String nextNodeId = adj.randomNeighbor();
        final Node nextNode;
        Node nextNodeOrAlias = getNode(nextNodeId);
        if (nextNodeOrAlias instanceof Alias) {
          nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
          nextNode = ((Alias) nextNodeOrAlias).get();
        } else {
          nextNode = nextNodeOrAlias;
        }
        final Properties nodeProps = getProps(nextNodeId);
        try {
          test = false;
          if (nextNode instanceof Test) {
            startTimer(nextNode);
            test = true;
          }

          // Wrap the visit of the next node in the module in a
          // callable that returns a thrown exception
          FutureTask<Exception> task = new FutureTask<>(() -> {
            try {
              nextNode.visit(state, env, nodeProps);
              return null;
            } catch (Exception e) {
              return e;
            }
          });

          // Run the task (should execute immediately)
          service.submit(task);

          Exception nodeException;
          try {
            // Bound the time we'll wait for the node to complete
            nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
          } catch (InterruptedException e) {
            log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName()
                + " to complete. Exiting.", e);
            break;
          } catch (ExecutionException e) {
            log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e);
            throw e;
          } catch (TimeoutException e) {
            log.info("Timed out waiting for " + nextNode.getClass().getSimpleName()
                + " to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
            break;
          }

          // The RandomWalk node throw an Exception that that Callable
          // handed back
          // Throw it and let the Module perform cleanup
          if (null != nodeException) {
            throw nodeException;
          }

          if (test)
            stopTimer(nextNode);
        } catch (Exception e) {
          log.debug("AccumuloClient belongs to user: " + env.getAccumuloClient().whoami());
          log.debug("Exception occured at: " + System.currentTimeMillis());
          log.debug("Properties for node: " + nextNodeId);
          for (Entry<Object,Object> entry : nodeProps.entrySet()) {
            log.debug("  " + entry.getKey() + ": " + entry.getValue());
          }
          log.debug("Overall Configuration Properties");
          for (Entry<Object,Object> entry : env.getTestProperties().entrySet()) {
            log.debug("  " + entry.getKey() + ": " + entry.getValue());
          }
          log.debug("State information");
          for (String key : new TreeSet<>(state.getMap().keySet())) {
            Object value = state.getMap().get(key);
            String logMsg = "  " + key + ": ";
            if (value == null)
              logMsg += "null";
            else if (value instanceof String || value instanceof Map || value instanceof Collection
                || value instanceof Number)
              logMsg += value;
            else if (value instanceof byte[])
              logMsg += new String((byte[]) value, UTF_8);
            else if (value instanceof PasswordToken)
              logMsg += new String(((PasswordToken) value).getPassword(), UTF_8);
            else
              logMsg += value.getClass() + " - " + value;

            log.debug(logMsg);
          }
          throw new Exception("Error running node " + nextNodeId, e);
        }

        // update aliases
        if ((aliases = aliasMap.get(curNodeId)) != null)
          for (String alias : aliases) {
            ((Alias) nodes.get(alias)).update(curNodeId);
          }

        curNodeId = nextNodeId;
      }
    } finally {
      if (null != service) {
        service.shutdownNow();
      }
    }

    if (teardown && (fixture != null)) {
      log.debug("tearing down module");
      fixture.tearDown(state, env);
    }
  }