in harry-core/src/harry/visitors/AllPartitionsValidator.java [95:135]
protected void validateAllPartitions() throws Throwable
{
List<Interruptible> threads = new ArrayList<>();
WaitQueue queue = WaitQueue.newWaitQueue();
WaitQueue.Signal interrupt = queue.register();
List<Throwable> errors = new CopyOnWriteArrayList<>();
final long maxPosition = pdSelector.maxPosition(tracker.maxStarted());
AtomicLong currentPosition = new AtomicLong();
for (int i = 0; i < concurrency; i++)
{
Interruptible thread = ExecutorFactory.Global.executorFactory().infiniteLoop(String.format("AllPartitionsValidator-%d", i + 1),
Runner.wrapInterrupt((state) -> {
if (state == Interruptible.State.NORMAL)
{
metricReporter.validatePartition();
long pos = currentPosition.getAndIncrement();
if (pos < maxPosition)
{
for (boolean reverse : new boolean[]{ true, false })
{
Query query = Query.selectPartition(schema, pdSelector.pd(pdSelector.minLtsAt(pos), schema), reverse);
model.validate(query);
queryLogger.logSelectQuery((int)pos, query);
}
}
else
{
interrupt.signalAll();
}
}
}, interrupt::signal, errors::add), SAFE, NON_DAEMON, UNSYNCHRONIZED);
threads.add(thread);
}
interrupt.awaitUninterruptibly();
Runner.shutdown(threads::stream);
if (!errors.isEmpty())
throw Runner.merge(errors);
}