protected void validateAllPartitions()

in harry-core/src/harry/visitors/AllPartitionsValidator.java [95:135]


    protected void validateAllPartitions() throws Throwable
    {
        List<Interruptible> threads = new ArrayList<>();
        WaitQueue queue = WaitQueue.newWaitQueue();
        WaitQueue.Signal interrupt = queue.register();
        List<Throwable> errors = new CopyOnWriteArrayList<>();

        final long maxPosition = pdSelector.maxPosition(tracker.maxStarted());
        AtomicLong currentPosition = new AtomicLong();
        for (int i = 0; i < concurrency; i++)
        {
            Interruptible thread = ExecutorFactory.Global.executorFactory().infiniteLoop(String.format("AllPartitionsValidator-%d", i + 1),
                                                                                         Runner.wrapInterrupt((state) -> {
                                                                                             if (state == Interruptible.State.NORMAL)
                                                                                             {
                                                                                                 metricReporter.validatePartition();
                                                                                                 long pos = currentPosition.getAndIncrement();
                                                                                                 if (pos < maxPosition)
                                                                                                 {
                                                                                                     for (boolean reverse : new boolean[]{ true, false })
                                                                                                     {
                                                                                                         Query query = Query.selectPartition(schema, pdSelector.pd(pdSelector.minLtsAt(pos), schema), reverse);
                                                                                                         model.validate(query);
                                                                                                         queryLogger.logSelectQuery((int)pos, query);
                                                                                                     }
                                                                                                 }
                                                                                                 else
                                                                                                 {
                                                                                                     interrupt.signalAll();
                                                                                                 }
                                                                                             }
                                                                                         }, interrupt::signal, errors::add), SAFE, NON_DAEMON, UNSYNCHRONIZED);
            threads.add(thread);
        }

        interrupt.awaitUninterruptibly();

        Runner.shutdown(threads::stream);
        if (!errors.isEmpty())
            throw Runner.merge(errors);
    }