in computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java [219:257]
private MessageStat sortAndSendLastBuffer(
Map<Integer, MessageSendPartition> all,
MessageType type) {
MessageStat messageWritten = new MessageStat();
List<Future<?>> futures = new ArrayList<>(all.size());
// Sort and send the last buffer
for (Map.Entry<Integer, MessageSendPartition> entry : all.entrySet()) {
int partitionId = entry.getKey();
MessageSendPartition partition = entry.getValue();
/*
* If the last buffer has already been sorted and sent (empty),
* there is no need to send again here
*/
for (WriteBuffers buffer : partition.buffers()) {
if (!buffer.isEmpty()) {
buffer.prepareSorting();
futures.add(this.sortThenSend(partitionId, type, buffer));
}
}
// Record total message count & bytes
messageWritten.increase(partition.messageWritten());
}
this.checkException();
// Wait all future finished
try {
for (Future<?> future : futures) {
future.get(Constants.FUTURE_TIMEOUT, TimeUnit.SECONDS);
}
} catch (TimeoutException e) {
throw new ComputerException("Timed out to wait for sorting task " +
"to finished", e);
} catch (InterruptedException | ExecutionException e) {
throw new ComputerException("Failed to wait for sorting task " +
"to finished", e);
}
return messageWritten;
}