private MessageStat sortAndSendLastBuffer()

in computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java [219:257]


    private MessageStat sortAndSendLastBuffer(
                        Map<Integer, MessageSendPartition> all,
                        MessageType type) {
        MessageStat messageWritten = new MessageStat();
        List<Future<?>> futures = new ArrayList<>(all.size());
        // Sort and send the last buffer
        for (Map.Entry<Integer, MessageSendPartition> entry : all.entrySet()) {
            int partitionId = entry.getKey();
            MessageSendPartition partition = entry.getValue();
            /*
             * If the last buffer has already been sorted and sent (empty),
             * there is no need to send again here
             */
            for (WriteBuffers buffer : partition.buffers()) {
                if (!buffer.isEmpty()) {
                    buffer.prepareSorting();
                    futures.add(this.sortThenSend(partitionId, type, buffer));
                }
            }
            // Record total message count & bytes
            messageWritten.increase(partition.messageWritten());
        }
        this.checkException();

        // Wait all future finished
        try {
            for (Future<?> future : futures) {
                future.get(Constants.FUTURE_TIMEOUT, TimeUnit.SECONDS);
            }
        } catch (TimeoutException e) {
            throw new ComputerException("Timed out to wait for sorting task " +
                                        "to finished", e);
        } catch (InterruptedException | ExecutionException e) {
            throw new ComputerException("Failed to wait for sorting task " +
                                        "to finished", e);
        }

        return messageWritten;
    }