public void flush()

in computer-core/src/main/java/org/apache/hugegraph/computer/core/sort/flusher/CombineSubKvOuterSortFlusher.java [56:133]


    public void flush(EntryIterator entries, KvEntryFileWriter writer)
                      throws IOException {
        E.checkArgument(entries.hasNext(), "Parameter entries can't be empty");

        PeekableIterator<KvEntry> kvEntries = PeekableIteratorAdaptor.of(
                                              entries);
        SubKvSorter sorter = new SubKvSorter(kvEntries, this.sources);
        KvEntry currentKv = sorter.currentKv();

        while (true) {
            currentKv.key().write(this.output);
            long position = this.output.position();
            // Write total sub-entry length placeholder
            this.output.writeFixedInt(0);
            // Write sub-entry count placeholder
            this.output.writeFixedInt(0);
            int writtenCount = 0;

            // Iterate subKv of currentKv
            KvEntry lastSubKv = sorter.next();
            Pointer lastSubValue = lastSubKv.value();
            while (true) {
                KvEntry current = null;
                if (sorter.hasNext()) {
                    current = sorter.next();
                    if (lastSubKv.compareTo(current) == 0) {
                        lastSubValue = this.combiner.combine(lastSubValue,
                                                             current.value());
                        continue;
                    }
                }

                lastSubKv.key().write(this.output);
                lastSubValue.write(this.output);
                writtenCount++;

                /*
                 * Fill placeholder if the number of subkvs with different
                 * keys is equal to the subKvFlushThreshold.
                 */
                if (current == null ||
                    writtenCount == this.subKvFlushThreshold) {
                    long currentPosition = this.output.position();
                    this.output.seek(position);
                    this.output.writeFixedInt((int) (currentPosition -
                                              position - Integer.BYTES));
                    this.output.writeFixedInt(writtenCount);
                    this.output.seek(currentPosition);
                    // Write kvEntry to file.
                    RandomAccessInput input = EntriesUtil.inputFromOutput(
                                                          this.output);
                    writer.write(EntriesUtil.kvEntryFromInput(input, true,
                                                              true));
                    this.output.seek(0);

                    if (current == null) {
                        break;
                    }
                    currentKv.key().write(this.output);
                    position = this.output.position();
                    // Write value length placeholder
                    this.output.writeFixedInt(0);
                    // Write subKv count placeholder
                    this.output.writeFixedInt(0);
                    writtenCount = 0;
                }

                lastSubKv = current;
                lastSubValue = lastSubKv.value();
            }
            sorter.reset();
            // Get next KV
            if ((currentKv = sorter.currentKv()) == null) {
                break;
            }
        }
        writer.finish();
    }