in src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java [50:126]
public void visit(State state, RandWalkEnv env, Properties props) throws Exception {
ThreadPoolExecutor threadPool = Setup.getThreadPool(state);
threadPool.shutdown();
int lastSize = 0;
while (!threadPool.isTerminated()) {
int size = threadPool.getQueue().size() + threadPool.getActiveCount();
log.info("Waiting for " + size + " nodes to complete");
if (size != lastSize)
makingProgress();
lastSize = size;
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
if (!"true".equals(state.get("bulkImportSuccess"))) {
log.info("Not verifying bulk import test due to import failures");
return;
}
String user = env.getAccumuloClient().whoami();
Authorizations auths = env.getAccumuloClient().securityOperations().getUserAuthorizations(user);
RowIterator rowIter;
boolean errorFound = false;
try (Scanner scanner = env.getAccumuloClient().createScanner(Setup.getTableName(), auths)) {
scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY);
for (Entry<Key,Value> entry : scanner) {
byte[] value = entry.getValue().get();
if (!Arrays.equals(value, zero)) {
log.error("Bad key found at {}", entry);
errorFound = true;
}
}
scanner.clearColumns();
scanner.fetchColumnFamily(BulkPlusOne.MARKER_CF);
rowIter = new RowIterator(scanner);
while (rowIter.hasNext()) {
Iterator<Entry<Key,Value>> row = rowIter.next();
long prev = 0;
Text rowText = null;
while (row.hasNext()) {
Entry<Key,Value> entry = row.next();
if (rowText == null) {
rowText = entry.getKey().getRow();
}
long curr = Long.parseLong(entry.getKey().getColumnQualifier().toString());
if (curr - 1 != prev) {
log.error("Bad market count. Current row: {} {}, Previous row marker: {}",
entry.getKey(), entry.getValue(), prev);
errorFound = true;
}
if (!entry.getValue().toString().equals("1")) {
log.error("Bad marker value for row {} {}.\n Value expected to be one", entry.getKey(),
entry.getValue());
errorFound = true;
}
prev = curr;
}
if (BulkPlusOne.counter.get() != prev) {
log.error("Row {} does not have all markers. Current marker: {}, Previous marker:{}",
rowText, BulkPlusOne.counter.get(), prev);
errorFound = true;
}
}
if (errorFound) {
throw new Exception("Error found during Verify");
}
}
log.info("Test successful on table " + Setup.getTableName());
env.getAccumuloClient().tableOperations().delete(Setup.getTableName());
}