public void testTakeWithRetryBuffers()

in tst/com/amazon/kinesis/streaming/agent/tailing/PublishingQueueTest.java [600:666]


    public void testTakeWithRetryBuffers() throws Exception {
        PublishingQueue<FirehoseRecord> q = getTestQueue();
        // Put couple of regular (never-published) buffers
        RecordBuffer<FirehoseRecord> buffer1 = queueAndFlushBuffer(q);
        RecordBuffer<FirehoseRecord> buffer2 = queueAndFlushBuffer(q);
        Assert.assertEquals(q.size(), 2);

        // Then put a couple of retry buffers
        RecordBuffer<FirehoseRecord> retryBuffer1 = getTestBuffer(flow);
        RecordBuffer<FirehoseRecord> retryBuffer2 = getTestBuffer(flow);
        q.queueBufferForRetry(retryBuffer1);
        q.queueBufferForRetry(retryBuffer2);
        Assert.assertEquals(q.size(), 4);

        // Put couple more regular (never-published) buffers
        RecordBuffer<FirehoseRecord> buffer3 = queueAndFlushBuffer(q);
        RecordBuffer<FirehoseRecord> buffer4 = queueAndFlushBuffer(q);
        Assert.assertEquals(q.size(), 6);

        // Following two calls will return immediately with retry buffers in FIFO order.
        Stopwatch timer = Stopwatch.createStarted();
        Assert.assertSame(q.take(), retryBuffer1);
        Assert.assertSame(q.take(), retryBuffer2);
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
        Assert.assertEquals(q.size(), 4);

        // Following two calls will return immediately with regular buffers in FIFO order.
        timer = Stopwatch.createStarted();
        Assert.assertSame(q.take(), buffer1);
        Assert.assertSame(q.take(), buffer2);
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
        Assert.assertEquals(q.size(), 2);

        // Then put a couple more retry buffers and see that they take precedence
        RecordBuffer<FirehoseRecord> retryBuffer3 = getTestBuffer(flow);
        RecordBuffer<FirehoseRecord> retryBuffer4 = getTestBuffer(flow);
        q.queueBufferForRetry(retryBuffer3);
        q.queueBufferForRetry(retryBuffer4);
        Assert.assertEquals(q.size(), 4);
        timer = Stopwatch.createStarted();
        Assert.assertSame(q.take(), retryBuffer3);
        Assert.assertSame(q.take(), retryBuffer4);
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
        Assert.assertEquals(q.size(), 2);

        // Following two calls will return immediately with remaining regular buffers in FIFO order.
        timer = Stopwatch.createStarted();
        Assert.assertSame(q.take(), buffer3);
        Assert.assertSame(q.take(), buffer4);
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
        Assert.assertEquals(q.size(), 0);
        Assert.assertEquals(q.totalRecords(), 0);
        Assert.assertEquals(q.totalBytes(), 0);

        // The next call will block and eventually return null
        timer = Stopwatch.createStarted();
        Assert.assertEquals(q.size(), 0);
        Assert.assertNull(q.take());
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) >= maxWaitOnEmptyQueue);

        // Now add another retry buffer and try again
        RecordBuffer<FirehoseRecord> retryBuffer5 = getTestBuffer(flow);
        q.queueBufferForRetry(retryBuffer5);
        timer = Stopwatch.createStarted();
        Assert.assertSame(q.take(), retryBuffer5);
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnEmptyQueue);
    }