in tst/com/amazon/kinesis/streaming/agent/tailing/PublishingQueueTest.java [600:666]
public void testTakeWithRetryBuffers() throws Exception {
PublishingQueue<FirehoseRecord> q = getTestQueue();
// Put couple of regular (never-published) buffers
RecordBuffer<FirehoseRecord> buffer1 = queueAndFlushBuffer(q);
RecordBuffer<FirehoseRecord> buffer2 = queueAndFlushBuffer(q);
Assert.assertEquals(q.size(), 2);
// Then put a couple of retry buffers
RecordBuffer<FirehoseRecord> retryBuffer1 = getTestBuffer(flow);
RecordBuffer<FirehoseRecord> retryBuffer2 = getTestBuffer(flow);
q.queueBufferForRetry(retryBuffer1);
q.queueBufferForRetry(retryBuffer2);
Assert.assertEquals(q.size(), 4);
// Put couple more regular (never-published) buffers
RecordBuffer<FirehoseRecord> buffer3 = queueAndFlushBuffer(q);
RecordBuffer<FirehoseRecord> buffer4 = queueAndFlushBuffer(q);
Assert.assertEquals(q.size(), 6);
// Following two calls will return immediately with retry buffers in FIFO order.
Stopwatch timer = Stopwatch.createStarted();
Assert.assertSame(q.take(), retryBuffer1);
Assert.assertSame(q.take(), retryBuffer2);
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
Assert.assertEquals(q.size(), 4);
// Following two calls will return immediately with regular buffers in FIFO order.
timer = Stopwatch.createStarted();
Assert.assertSame(q.take(), buffer1);
Assert.assertSame(q.take(), buffer2);
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
Assert.assertEquals(q.size(), 2);
// Then put a couple more retry buffers and see that they take precedence
RecordBuffer<FirehoseRecord> retryBuffer3 = getTestBuffer(flow);
RecordBuffer<FirehoseRecord> retryBuffer4 = getTestBuffer(flow);
q.queueBufferForRetry(retryBuffer3);
q.queueBufferForRetry(retryBuffer4);
Assert.assertEquals(q.size(), 4);
timer = Stopwatch.createStarted();
Assert.assertSame(q.take(), retryBuffer3);
Assert.assertSame(q.take(), retryBuffer4);
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
Assert.assertEquals(q.size(), 2);
// Following two calls will return immediately with remaining regular buffers in FIFO order.
timer = Stopwatch.createStarted();
Assert.assertSame(q.take(), buffer3);
Assert.assertSame(q.take(), buffer4);
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
Assert.assertEquals(q.size(), 0);
Assert.assertEquals(q.totalRecords(), 0);
Assert.assertEquals(q.totalBytes(), 0);
// The next call will block and eventually return null
timer = Stopwatch.createStarted();
Assert.assertEquals(q.size(), 0);
Assert.assertNull(q.take());
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) >= maxWaitOnEmptyQueue);
// Now add another retry buffer and try again
RecordBuffer<FirehoseRecord> retryBuffer5 = getTestBuffer(flow);
q.queueBufferForRetry(retryBuffer5);
timer = Stopwatch.createStarted();
Assert.assertSame(q.take(), retryBuffer5);
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
}