in tst/com/amazon/kinesis/streaming/agent/tailing/AbstractParserTest.java [332:386]
public void testContinueWhileParsingBuffer() throws IOException {
Path testFile1 = testFiles.createTempFile();
RecordGenerator generator = new RecordGenerator();
generator.appendDataToFile(testFile1, getTestBytes());
// Parse the file1 to the end
P parser = buildParser();
parser = spy(parser);
TrackedFile file1 = new TrackedFile(flow, testFile1);
file1.open(0);
assertTrue(parser.switchParsingToFile(file1));
List<R> records = parseAllRecords(parser, null);
assertTrue(records.size() > 0); // SANITYCHECK: Ensure we didn't shoot blanks
// Validate that we're at the end of the file, and the current buffer is empty
assertFalse(parser.currentBuffer.hasRemaining());
assertEquals(parser.currentFileChannel.position(), Files.size(testFile1));
assertEquals(parser.bufferedBytesRemaining(), 0);
// Add 2 more records, then read only 1
generator.appendRecordsToFile(file1.getPath(), 2);
R record1 = parser.readRecord();
assertNotNull(record1);
assertSame(record1.file(), file1);
assertSame(parser.currentFile, file1);
assertTrue(record1.startOffset() > 0);
assertTrue(record1.endOffset() < Files.size(testFile1));
assertTrue(parser.currentBuffer.hasRemaining()); // There's one more record in the buffer
assertTrue(parser.bufferedBytesRemaining() > 0);
assertEquals(parser.currentFileChannel.position(), Files.size(testFile1));
// Now simulate rotation and write a single record into the new file
Path testFile2 = testFiles.createTempFile();
generator.appendRecordsToFile(testFile2, 1);
TrackedFile file2 = new TrackedFile(flow, testFile2);
file2.open(0);
assertTrue(parser.continueParsingWithFile(file2));
assertTrue(parser.isParsing());
// Now read a single record from the parser... it should be the record from the previous file
R record2 = parser.readRecord();
assertNotNull(record2);
assertSame(record2.file(), file1);
assertSame(parser.currentFile, file2);
assertTrue(record2.startOffset() > 0);
assertEquals(record2.endOffset(), Files.size(testFile1));
assertFalse(parser.currentBuffer.hasRemaining());
// The following record should be the record from the new file
R record3 = parser.readRecord();
assertNotNull(record3);
assertSame(record3.file(), file2);
assertEquals(record3.startOffset(), 0);
assertEquals(record3.endOffset(), Files.size(testFile2));
}