in computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvBuffersTest.java [37:102]
public void testBufferToBuffers() {
long threshold = 1024L;
int size = 100;
MessageRecvBuffers buffers = new MessageRecvBuffers(threshold,
WAIT_TIMEOUT);
// It's ok to wait for empty buffers
buffers.waitSorted();
for (int i = 0; i < 10; i++) {
addMockBufferToBuffers(buffers, size);
}
Assert.assertFalse(buffers.full());
Assert.assertEquals(1000L, buffers.totalBytes());
Assert.assertThrows(ComputerException.class, () -> {
buffers.waitSorted();
}, e -> {
Assert.assertContains("Buffers have not been sorted in 100 ms",
e.getMessage());
});
addMockBufferToBuffers(buffers, size);
Assert.assertTrue(buffers.full());
List<RandomAccessInput> list = buffers.buffers();
Assert.assertEquals(11, list.size());
buffers.signalSorted();
List<RandomAccessInput> list2 = buffers.buffers();
Assert.assertEquals(11, list2.size());
Assert.assertEquals(1100L, buffers.totalBytes());
// It's ok to call waitSorted multi-times
buffers.waitSorted();
buffers.waitSorted();
// Next time again
buffers.prepareSort();
List<RandomAccessInput> list3 = buffers.buffers();
Assert.assertEquals(0, list3.size());
Assert.assertEquals(0L, buffers.totalBytes());
buffers.waitSorted();
for (int i = 0; i < 10; i++) {
addMockBufferToBuffers(buffers, size);
}
Assert.assertEquals(1000L, buffers.totalBytes());
Assert.assertFalse(buffers.full());
Assert.assertThrows(ComputerException.class, () -> {
buffers.waitSorted();
}, e -> {
Assert.assertContains("Buffers have not been sorted in 100 ms",
e.getMessage());
});
addMockBufferToBuffers(buffers, size);
Assert.assertTrue(buffers.full());
List<RandomAccessInput> list4 = buffers.buffers();
Assert.assertEquals(11, list4.size());
}