public void visit()

in src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java [50:126]


  public void visit(State state, RandWalkEnv env, Properties props) throws Exception {
    ThreadPoolExecutor threadPool = Setup.getThreadPool(state);
    threadPool.shutdown();
    int lastSize = 0;
    while (!threadPool.isTerminated()) {
      int size = threadPool.getQueue().size() + threadPool.getActiveCount();
      log.info("Waiting for " + size + " nodes to complete");
      if (size != lastSize)
        makingProgress();
      lastSize = size;
      threadPool.awaitTermination(10, TimeUnit.SECONDS);
    }
    if (!"true".equals(state.get("bulkImportSuccess"))) {
      log.info("Not verifying bulk import test due to import failures");
      return;
    }

    String user = env.getAccumuloClient().whoami();
    Authorizations auths = env.getAccumuloClient().securityOperations().getUserAuthorizations(user);
    RowIterator rowIter;
    boolean errorFound = false;
    try (Scanner scanner = env.getAccumuloClient().createScanner(Setup.getTableName(), auths)) {
      scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY);
      for (Entry<Key,Value> entry : scanner) {
        byte[] value = entry.getValue().get();
        if (!Arrays.equals(value, zero)) {
          log.error("Bad key found at {}", entry);
          errorFound = true;
        }
      }

      scanner.clearColumns();
      scanner.fetchColumnFamily(BulkPlusOne.MARKER_CF);
      rowIter = new RowIterator(scanner);

      while (rowIter.hasNext()) {
        Iterator<Entry<Key,Value>> row = rowIter.next();
        long prev = 0;
        Text rowText = null;
        while (row.hasNext()) {
          Entry<Key,Value> entry = row.next();

          if (rowText == null) {
            rowText = entry.getKey().getRow();
          }

          long curr = Long.parseLong(entry.getKey().getColumnQualifier().toString());

          if (curr - 1 != prev) {
            log.error("Bad market count. Current row: {} {}, Previous row marker: {}",
                entry.getKey(), entry.getValue(), prev);
            errorFound = true;
          }

          if (!entry.getValue().toString().equals("1")) {
            log.error("Bad marker value for row {} {}.\n Value expected to be one", entry.getKey(),
                entry.getValue());
            errorFound = true;
          }

          prev = curr;
        }

        if (BulkPlusOne.counter.get() != prev) {
          log.error("Row {} does not have all markers. Current marker: {}, Previous marker:{}",
              rowText, BulkPlusOne.counter.get(), prev);
          errorFound = true;
        }
      }
      if (errorFound) {
        throw new Exception("Error found during Verify");
      }
    }

    log.info("Test successful on table " + Setup.getTableName());
    env.getAccumuloClient().tableOperations().delete(Setup.getTableName());
  }