public void testNonBlockingOfferRecordWithFullQueue()

in tst/com/amazon/kinesis/streaming/agent/tailing/PublishingQueueTest.java [363:414]


    public void testNonBlockingOfferRecordWithFullQueue() throws Exception {
        PublishingQueue<FirehoseRecord> q = getTestQueue();

        // Fill the queue up
        Stopwatch timer = Stopwatch.createStarted();
        while(q.size() < q.capacity()) {
            queueAndFlushBuffer(q);
        }
        Assert.assertEquals(q.size(), q.capacity());
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnFullQueue);

        // Make sure we don't exceed the max batch limits
        timer = Stopwatch.createStarted();
        final int recordCount = flow.getMaxBufferSizeRecords();
        final int recordSize = flow.getMaxRecordSizeBytes() / recordCount / 2;
        final double recordSizeJitter = 0.0;
        RecordGenerator generator = new RecordGenerator(recordSize, recordSizeJitter);
        long totalBytes = 0;
        for(int i = 0; i < recordCount; ++i) {
            FirehoseRecord record = getTestRecord(flow, generator);
            Assert.assertTrue(q.offerRecord(record, false));
            totalBytes += record.lengthWithOverhead();
        }
        // SANITYCHECK: We shouldn't have any buffers yet, assuming this didn't
        //              take longer than max buffer age, and we didn't exceed
        //              the max buffer size in bytes
        Assert.assertTrue(totalBytes < flow.getMaxBufferSizeBytes());
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < flow.getMaxBufferAgeMillis());
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnFullQueue);

        // Assert record insertions succeeded but no new buffer was queued
        Assert.assertEquals(q.pendingRecords(), recordCount);
        Assert.assertEquals(q.size(), q.capacity());

        // The next record will not be inserted since the temp buffer is already full
        FirehoseRecord record = getTestRecord(flow, generator);
        timer = Stopwatch.createStarted();
        Assert.assertFalse(q.offerRecord(record, false));
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnFullQueue);
        // Assert no side-effects
        Assert.assertEquals(q.pendingRecords(), recordCount);
        Assert.assertEquals(q.size(), q.capacity());

        // Freeing-up the queue by a call to take will cause the yet un-queued mature temp buffer to be queued
        Assert.assertNotNull(q.take());
        timer = Stopwatch.createStarted();
        Assert.assertEquals(q.size(), q.capacity());
        Assert.assertEquals(q.pendingRecords(), 0);
        Assert.assertTrue(q.offerRecord(record, false));
        Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnFullQueue);
        Assert.assertEquals(q.pendingRecords(), 1);
    }