in src/main/java/org/apache/accumulo/testing/merkle/cli/GenerateHashes.java [150:236]
public void run(final AccumuloClient client, final String inputTableName,
final String outputTableName, final String digestName, int numThreads,
final boolean iteratorPushdown, final Collection<Range> ranges)
throws TableNotFoundException, AccumuloException, NoSuchAlgorithmException {
if (!client.tableOperations().exists(outputTableName)) {
throw new IllegalArgumentException(outputTableName + " does not exist, please create it");
}
// Get some parallelism
ExecutorService svc = Executors.newFixedThreadPool(numThreads);
try (BatchWriter bw = client.createBatchWriter(outputTableName)) {
for (final Range range : ranges) {
final MessageDigest digest = getDigestAlgorithm(digestName);
svc.execute(() -> {
Scanner s;
try {
s = client.createScanner(inputTableName, Authorizations.EMPTY);
} catch (Exception e) {
log.error("Could not get scanner for " + inputTableName, e);
throw new RuntimeException(e);
}
s.setRange(range);
Value v;
Mutation m;
if (iteratorPushdown) {
IteratorSetting cfg = new IteratorSetting(50, DigestIterator.class);
cfg.addOption(DigestIterator.HASH_NAME_KEY, digestName);
s.addScanIterator(cfg);
// The scanner should only ever return us one
// Key-Value, otherwise this approach won't work
Entry<Key,Value> entry = Iterables.getOnlyElement(s);
v = entry.getValue();
m = RangeSerialization.toMutation(range, v);
} else {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (Entry<Key,Value> entry : s) {
DataOutputStream out = new DataOutputStream(baos);
try {
entry.getKey().write(out);
entry.getValue().write(out);
} catch (Exception e) {
log.error("Error writing {}", entry, e);
throw new RuntimeException(e);
}
digest.update(baos.toByteArray());
baos.reset();
}
v = new Value(digest.digest());
m = RangeSerialization.toMutation(range, v);
}
// Log some progress
log.info("{} computed digest for {} of {}", Thread.currentThread().getName(), range,
Hex.encodeHexString(v.get()));
try {
bw.addMutation(m);
} catch (MutationsRejectedException e) {
log.error("Could not write mutation", e);
throw new RuntimeException(e);
}
});
}
svc.shutdown();
// Wait indefinitely for the scans to complete
while (!svc.isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error(
"Interrupted while waiting for executor service to gracefully complete. Exiting now");
svc.shutdownNow();
return;
}
}
}
}