private void testMergeSubKvFiles()

in computer-test/src/main/java/org/apache/hugegraph/computer/core/sort/sorter/SorterTest.java [389:465]


    private void testMergeSubKvFiles(Config config) throws Exception {
        int flushThreshold = config.get(
                ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);

        List<Integer> kv1 = ImmutableList.of(1,
                                             2, 1,
                                             4, 1);
        List<Integer> kv2 = ImmutableList.of(4,
                                             2, 1,
                                             3, 1);
        List<Integer> kv3 = ImmutableList.of(4,
                                             6, 1,
                                             8, 1);
        List<Integer> kv4 = ImmutableList.of(1,
                                             1, 1,
                                             2, 1);
        List<Integer> kv5 = ImmutableList.of(1,
                                             5, 1,
                                             7, 1);
        List<Integer> kv6 = ImmutableList.of(2,
                                             2, 1,
                                             5, 1);

        List<List<Integer>> data1 = ImmutableList.of(kv1, kv2, kv3);
        List<List<Integer>> data2 = ImmutableList.of(kv4, kv5, kv6);
        List<List<Integer>> data3 = ImmutableList.of(kv4, kv1, kv3);
        List<List<List<Integer>>> datas = ImmutableList.of(data1, data2, data3);

        String input1 = StoreTestUtil.availablePathById(1);
        String input2 = StoreTestUtil.availablePathById(2);
        String input3 = StoreTestUtil.availablePathById(3);
        String output = StoreTestUtil.availablePathById(0);

        List<String> inputs = ImmutableList.of(input1, input2, input3);
        List<String> outputs = ImmutableList.of(output);

        boolean useBufferFile = config.get(
                ComputerOptions.TRANSPORT_RECV_FILE_MODE);
        for (int i = 0; i < inputs.size(); i++) {
            String input = inputs.get(i);
            List<List<Integer>> data = datas.get(i);
            if (useBufferFile) {
                StoreTestUtil.bufferFileFromSubKvMap(data, input);
            } else {
                StoreTestUtil.hgkvDirFromSubKvMap(config, data, input);
            }
        }

        Sorter sorter = SorterTestUtil.createSorter(config);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                IntValue::new,
                new IntValueSumCombiner());
        OuterSortFlusher flusher = new CombineSubKvOuterSortFlusher(
                combiner, flushThreshold);
        flusher.sources(inputs.size());
        sorter.mergeInputs(inputs, flusher, outputs, true);

        /* Assert result
         * key 1 subKv 1 2 2 4
         * key 1 subKv 4 2 5 1
         * key 1 subKv 7 1
         * key 2 subKv 2 1 5 1
         * key 4 subKv 2 1 3 1
         * key 4 subKv 6 2 8 2
         */
        try (CIter<KvEntry> kvIter = sorter.iterator(outputs, true)) {
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 1, 2, 2, 4);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 4, 2, 5, 1);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 7, 1);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 2, 2, 1, 5, 1);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 4, 2, 1, 3, 1);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 4, 6, 2, 8, 2);
        }

        FileUtil.deleteFilesQuietly(inputs);
        FileUtil.deleteFilesQuietly(outputs);
    }