in computer-test/src/main/java/org/apache/hugegraph/computer/core/sender/MultiQueueTest.java [104:189]
public void testPutAndTakeWithPutAtFront() throws InterruptedException {
MultiQueue queue = new MultiQueue(2);
Throwable[] exceptions = new Throwable[3];
CountDownLatch[] latches = new CountDownLatch[3];
for (int i = 0; i < latches.length; i++) {
latches[i] = new CountDownLatch(1);
}
Thread thread1 = new Thread(() -> {
try {
latches[0].await();
queue.put(0, new QueuedMessage(1, MessageType.VERTEX, null));
latches[1].await();
queue.put(0, new QueuedMessage(3, MessageType.VERTEX, null));
latches[2].await();
queue.put(0, new QueuedMessage(5, MessageType.VERTEX, null));
} catch (Throwable e) {
exceptions[0] = e;
}
});
Thread thread2 = new Thread(() -> {
try {
latches[0].await();
queue.put(1, new QueuedMessage(2, MessageType.VERTEX, null));
latches[1].await();
queue.put(1, new QueuedMessage(4, MessageType.VERTEX, null));
latches[2].await();
queue.put(1, new QueuedMessage(6, MessageType.VERTEX, null));
} catch (Throwable e) {
exceptions[1] = e;
}
});
Thread thread3 = new Thread(() -> {
try {
latches[0].countDown();
QueuedMessage message1 = queue.take();
QueuedMessage message2 = queue.take();
Assert.assertTrue(ImmutableSet.of(1, 2).contains(
message1.partitionId()));
Assert.assertTrue(ImmutableSet.of(1, 2).contains(
message2.partitionId()));
latches[1].countDown();
QueuedMessage message = queue.take();
Assert.assertTrue(ImmutableSet.of(3, 4).contains(
message.partitionId()));
// Put the message at the front of the original queue
if ((message.partitionId() & 0x01) == 1) {
queue.putAtFront(0, message);
} else {
queue.putAtFront(1, message);
}
Assert.assertTrue(ImmutableSet.of(3, 4).contains(
queue.take().partitionId()));
Assert.assertTrue(ImmutableSet.of(3, 4).contains(
queue.take().partitionId()));
latches[2].countDown();
Assert.assertTrue(ImmutableSet.of(5, 6).contains(
queue.take().partitionId()));
Assert.assertTrue(ImmutableSet.of(5, 6).contains(
queue.take().partitionId()));
} catch (Throwable e) {
exceptions[2] = e;
}
});
thread1.start();
thread2.start();
thread3.start();
thread1.join();
thread2.join();
thread3.join();
for (Throwable e : exceptions) {
Assert.assertNull(e);
}
}