public void run()

in src/main/java/org/apache/accumulo/testing/merkle/cli/GenerateHashes.java [150:236]


  public void run(final AccumuloClient client, final String inputTableName,
      final String outputTableName, final String digestName, int numThreads,
      final boolean iteratorPushdown, final Collection<Range> ranges)
      throws TableNotFoundException, AccumuloException, NoSuchAlgorithmException {
    if (!client.tableOperations().exists(outputTableName)) {
      throw new IllegalArgumentException(outputTableName + " does not exist, please create it");
    }

    // Get some parallelism
    ExecutorService svc = Executors.newFixedThreadPool(numThreads);

    try (BatchWriter bw = client.createBatchWriter(outputTableName)) {
      for (final Range range : ranges) {
        final MessageDigest digest = getDigestAlgorithm(digestName);

        svc.execute(() -> {
          Scanner s;
          try {
            s = client.createScanner(inputTableName, Authorizations.EMPTY);
          } catch (Exception e) {
            log.error("Could not get scanner for " + inputTableName, e);
            throw new RuntimeException(e);
          }

          s.setRange(range);

          Value v;
          Mutation m;
          if (iteratorPushdown) {
            IteratorSetting cfg = new IteratorSetting(50, DigestIterator.class);
            cfg.addOption(DigestIterator.HASH_NAME_KEY, digestName);
            s.addScanIterator(cfg);

            // The scanner should only ever return us one
            // Key-Value, otherwise this approach won't work
            Entry<Key,Value> entry = Iterables.getOnlyElement(s);

            v = entry.getValue();
            m = RangeSerialization.toMutation(range, v);
          } else {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            for (Entry<Key,Value> entry : s) {
              DataOutputStream out = new DataOutputStream(baos);
              try {
                entry.getKey().write(out);
                entry.getValue().write(out);
              } catch (Exception e) {
                log.error("Error writing {}", entry, e);
                throw new RuntimeException(e);
              }

              digest.update(baos.toByteArray());
              baos.reset();
            }

            v = new Value(digest.digest());
            m = RangeSerialization.toMutation(range, v);
          }

          // Log some progress
          log.info("{} computed digest for {} of {}", Thread.currentThread().getName(), range,
              Hex.encodeHexString(v.get()));

          try {
            bw.addMutation(m);
          } catch (MutationsRejectedException e) {
            log.error("Could not write mutation", e);
            throw new RuntimeException(e);
          }
        });
      }

      svc.shutdown();

      // Wait indefinitely for the scans to complete
      while (!svc.isTerminated()) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          log.error(
              "Interrupted while waiting for executor service to gracefully complete. Exiting now");
          svc.shutdownNow();
          return;
        }
      }
    }
  }