private void doTestRandomPutRemoveMultithreaded()

in modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java [2576:2752]


    private void doTestRandomPutRemoveMultithreaded(boolean canGetRow) throws Exception {
        final TestTree tree = createTestTree(canGetRow);

        final Map<Long, Long> map = new ConcurrentHashMap<>();

        final int loops = reuseList == null ? 10_000 : 30_000;

        final IgniteStripedLock lock = new IgniteStripedLock(256);

        final String[] ops = {"put", "rmv", "inv_put", "inv_rmv"};

        CompletableFuture<?> fut = runMultiThreadedAsync(() -> {
            for (int i = 0; i < loops && !stop.get(); i++) {
                final Long x = (long) DataStructure.randomInt(CNT);
                final int op = DataStructure.randomInt(4);

                if (i % 5_000 == 0) {
                    println(" --> " + ops[op] + "_" + i + "  " + x);
                }

                Lock l = lock.getLock(x.longValue());

                l.lock();

                try {
                    if (op == 0) { // Put.
                        assertEquals(map.put(x, x), tree.put(x));

                        assertNoLocks();
                    } else if (op == 1) { // Remove.
                        if (map.remove(x) != null) {
                            assertEquals(x, tree.remove(x));

                            assertNoLocks();
                        }

                        assertNull(tree.remove(x));

                        assertNoLocks();
                    } else if (op == 2) {
                        tree.invoke(x, null, new InvokeClosure<>() {
                            OperationType opType;

                            @Override
                            public void call(@Nullable Long row) {
                                opType = PUT;

                                if (row != null) {
                                    assertEquals(x, row);
                                }
                            }

                            @Override
                            public Long newRow() {
                                return x;
                            }

                            @Override
                            public OperationType operationType() {
                                return opType;
                            }
                        });

                        map.put(x, x);
                    } else if (op == 3) {
                        tree.invoke(x, null, new InvokeClosure<Long>() {
                            OperationType opType;

                            @Override
                            public void call(@Nullable Long row) {
                                if (row != null) {
                                    assertEquals(x, row);
                                    opType = REMOVE;
                                } else {
                                    opType = NOOP;
                                }
                            }

                            @Override
                            public Long newRow() {
                                return null;
                            }

                            @Override
                            public OperationType operationType() {
                                return opType;
                            }
                        });

                        map.remove(x);
                    } else {
                        fail();
                    }
                } finally {
                    l.unlock();
                }
            }

            return null;
        }, CPUS, "put-remove");

        CompletableFuture<?> fut2 = runMultiThreadedAsync(() -> {
            while (!stop.get()) {
                Thread.sleep(1_000);

                println(TestTree.printLocks());
            }

            return null;
        }, 1, "printLocks");

        CompletableFuture<?> fut3 = runMultiThreadedAsync(() -> {
            while (!stop.get()) {
                int low = DataStructure.randomInt(CNT);
                int high = low + DataStructure.randomInt(CNT - low);

                Cursor<Long> c = tree.find((long) low, (long) high);

                Long last = null;

                while (c.hasNext()) {
                    Long t = c.next();

                    // Correct bounds.
                    assertTrue(t >= low, low + " <= " + t + " <= " + high);
                    assertTrue(t <= high, low + " <= " + t + " <= " + high);

                    if (last != null) {  // No duplicates.
                        assertTrue(t > last, low + " <= " + last + " < " + t + " <= " + high);
                    }

                    last = t;
                }

                TestTreeFindFirstClosure cl = new TestTreeFindFirstClosure();

                tree.iterate((long) low, (long) high, cl);

                last = cl.val;

                if (last != null) {
                    assertTrue(last >= low, low + " <= " + last + " <= " + high);
                    assertTrue(last <= high, low + " <= " + last + " <= " + high);
                }
            }

            return null;
        }, 4, "find");

        asyncRunFut = CompletableFuture.allOf(fut, fut2, fut3);

        try {
            fut.get(getTestTimeout(), MILLISECONDS);
        } finally {
            stop.set(true);

            asyncRunFut.get(getTestTimeout(), MILLISECONDS);
        }

        Cursor<Long> cursor = tree.find(null, null);

        while (cursor.hasNext()) {
            Long x = cursor.next();

            assert x != null;

            assertEquals(map.get(x), x);
        }

        info("size: " + map.size());

        assertEquals(map.size(), tree.size());

        tree.validateTree();

        assertNoLocks();
    }