in src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java [38:92]
public static void main(String[] args) throws Exception {
try (ContinuousEnv env = new ContinuousEnv(args)) {
long distance = 1_000_000_000_000L;
AccumuloClient client = env.getAccumuloClient();
int numToScan = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_ENTRIES));
int scannerSleepMs = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_SLEEP_MS));
ConsistencyLevel cl = TestProps
.getScanConsistencyLevel(env.getTestProperty(TestProps.CI_SCANNER_CONSISTENCY_LEVEL));
double delta = Math.min(.05, .05 / (numToScan / 1000.0));
try (Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
env.getRandomAuthorizations())) {
while (true) {
long startRow = ContinuousIngest.genLong(env.getRowMin(), env.getRowMax() - distance,
env.getRandom());
byte[] scanStart = ContinuousIngest.genRow(startRow);
byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
scanner.setConsistencyLevel(cl);
long t1 = System.currentTimeMillis();
long count = scanner.stream()
.peek(entry -> ContinuousWalk.validate(entry.getKey(), entry.getValue())).count();
long t2 = System.currentTimeMillis();
if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
if (count == 0) {
distance = distance * 10;
if (distance < 0)
distance = 1_000_000_000_000L;
} else {
double ratio = (double) numToScan / count;
// move ratio closer to 1 to make change slower
ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
distance = (long) (ratio * distance);
}
}
log.debug("SCAN - start: {}ms, start row: {}, duration: {}ms, total scanned: {}", t1,
new String(scanStart, UTF_8), (t2 - t1), count);
if (scannerSleepMs > 0) {
sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
}
}
}
}
}