private void testMergeKvInputs()

in computer-test/src/main/java/org/apache/hugegraph/computer/core/sort/sorter/SorterTest.java [177:266]


    private void testMergeKvInputs(Config config) throws Exception {
        List<Integer> map1 = ImmutableList.of(2, 3,
                                              2, 1,
                                              5, 2,
                                              6, 9,
                                              6, 2);
        List<Integer> map2 = ImmutableList.of(1, 3,
                                              1, 2,
                                              3, 2);

        // Input hgkvDirs
        String file1Name = StoreTestUtil.availablePathById("1");
        String file2Name = StoreTestUtil.availablePathById("2");
        String file3Name = StoreTestUtil.availablePathById("3");
        String file4Name = StoreTestUtil.availablePathById("4");
        String file5Name = StoreTestUtil.availablePathById("5");
        String file6Name = StoreTestUtil.availablePathById("6");
        String file7Name = StoreTestUtil.availablePathById("7");
        String file8Name = StoreTestUtil.availablePathById("8");
        String file9Name = StoreTestUtil.availablePathById("9");
        String file10Name = StoreTestUtil.availablePathById("10");

        List<String> inputs = Lists.newArrayList(file1Name, file2Name,
                                                 file3Name, file4Name,
                                                 file5Name, file6Name,
                                                 file7Name, file8Name,
                                                 file9Name, file10Name);
        // Output hgkvDirs
        String output1 = StoreTestUtil.availablePathById("20");
        String output2 = StoreTestUtil.availablePathById("21");
        List<String> outputs = ImmutableList.of(output1, output2);

        for (int i = 0; i < inputs.size(); i++) {
            List<Integer> map;
            if ((i & 1) == 0) {
                map = map1;
            } else {
                map = map2;
            }
            if (config.get(ComputerOptions.TRANSPORT_RECV_FILE_MODE)) {
                StoreTestUtil.bufferFileFromKvMap(map, inputs.get(i));
            } else {
                StoreTestUtil.hgkvDirFromKvMap(config, map, inputs.get(i));
            }
        }

        // Merge file
        Sorter sorter = SorterTestUtil.createSorter(config);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new IntValueSumCombiner());
        sorter.mergeInputs(inputs, new CombineKvOuterSortFlusher(combiner),
                           outputs, false);

        // Assert sort result
        List<Integer> result = ImmutableList.of(1, 25,
                                                2, 20,
                                                3, 10,
                                                5, 10,
                                                6, 55);
        Iterator<Integer> resultIter = result.iterator();
        Iterator<KvEntry> iterator = sorter.iterator(outputs, false);
        KvEntry last = iterator.next();
        int value = StoreTestUtil.dataFromPointer(last.value());
        while (true) {
            KvEntry current = null;
            if (iterator.hasNext()) {
                current = iterator.next();
                if (last.compareTo(current) == 0) {
                    value += StoreTestUtil.dataFromPointer(current.value());
                    continue;
                }
            }

            Assert.assertEquals(StoreTestUtil.dataFromPointer(last.key()),
                                resultIter.next());
            Assert.assertEquals(value, resultIter.next());

            if (current == null) {
                break;
            }

            last = current;
            value = StoreTestUtil.dataFromPointer(last.value());
        }
        Assert.assertFalse(resultIter.hasNext());

        FileUtil.deleteFilesQuietly(inputs);
        FileUtil.deleteFilesQuietly(outputs);
    }