in tst/com/amazon/kinesis/streaming/agent/tailing/PublishingQueueTest.java [363:414]
public void testNonBlockingOfferRecordWithFullQueue() throws Exception {
PublishingQueue<FirehoseRecord> q = getTestQueue();
// Fill the queue up
Stopwatch timer = Stopwatch.createStarted();
while(q.size() < q.capacity()) {
queueAndFlushBuffer(q);
}
Assert.assertEquals(q.size(), q.capacity());
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnFullQueue);
// Make sure we don't exceed the max batch limits
timer = Stopwatch.createStarted();
final int recordCount = flow.getMaxBufferSizeRecords();
final int recordSize = flow.getMaxRecordSizeBytes() / recordCount / 2;
final double recordSizeJitter = 0.0;
RecordGenerator generator = new RecordGenerator(recordSize, recordSizeJitter);
long totalBytes = 0;
for(int i = 0; i < recordCount; ++i) {
FirehoseRecord record = getTestRecord(flow, generator);
Assert.assertTrue(q.offerRecord(record, false));
totalBytes += record.lengthWithOverhead();
}
// SANITYCHECK: We shouldn't have any buffers yet, assuming this didn't
// take longer than max buffer age, and we didn't exceed
// the max buffer size in bytes
Assert.assertTrue(totalBytes < flow.getMaxBufferSizeBytes());
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < flow.getMaxBufferAgeMillis());
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnFullQueue);
// Assert record insertions succeeded but no new buffer was queued
Assert.assertEquals(q.pendingRecords(), recordCount);
Assert.assertEquals(q.size(), q.capacity());
// The next record will not be inserted since the temp buffer is already full
FirehoseRecord record = getTestRecord(flow, generator);
timer = Stopwatch.createStarted();
Assert.assertFalse(q.offerRecord(record, false));
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnFullQueue);
// Assert no side-effects
Assert.assertEquals(q.pendingRecords(), recordCount);
Assert.assertEquals(q.size(), q.capacity());
// Freeing-up the queue by a call to take will cause the yet un-queued mature temp buffer to be queued
Assert.assertNotNull(q.take());
timer = Stopwatch.createStarted();
Assert.assertEquals(q.size(), q.capacity());
Assert.assertEquals(q.pendingRecords(), 0);
Assert.assertTrue(q.offerRecord(record, false));
Assert.assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) < maxWaitOnFullQueue);
Assert.assertEquals(q.pendingRecords(), 1);
}