in tst/com/amazon/kinesis/streaming/agent/tailing/KinesisSenderTest.java [137:178]
public void testRetryingPartialFailures(final int recordCount, final int[][] successiveFailedRecords) {
final RecordBuffer<KinesisRecord> testBuffer = getTestKinesisBuffer(flow, recordCount);
final AtomicInteger attempt = new AtomicInteger(0);
for(int[] failedRecords : successiveFailedRecords)
Arrays.sort(failedRecords);
AmazonKinesisClient kinesisClient = Mockito.mock(AmazonKinesisClient.class);
Mockito.when(kinesisClient.putRecords(Mockito.any(PutRecordsRequest.class))).then(new Answer<PutRecordsResult>() {
@Override
public PutRecordsResult answer(InvocationOnMock invocation) throws Throwable {
try {
// When successiveFailedRecords return assume no more failed records
int[] failedRecords = attempt.get() < successiveFailedRecords.length ? successiveFailedRecords[attempt.get()] : new int[]{};
return answerKinesisPutRecords(invocation, testBuffer, failedRecords);
} finally {
attempt.incrementAndGet();
}
}
});
context = Mockito.spy(context);
Mockito.when(context.getKinesisClient()).thenReturn(kinesisClient);
KinesisSender sender = new KinesisSender(context, flow);
for(int i = 0; i < successiveFailedRecords.length; ++i) {
Assert.assertTrue(successiveFailedRecords[i].length > 0, "Failed records array cannot be empty in this test!");
List<KinesisRecord> recordsInBuffer = Lists.newArrayList(testBuffer);
BufferSendResult<KinesisRecord> result = sender.sendBuffer(testBuffer);
Assert.assertNotNull(result);
Assert.assertEquals(result.getBuffer().id(), testBuffer.id());
Assert.assertEquals(result.getStatus(), BufferSendResult.Status.PARTIAL_SUCCESS);
Assert.assertEquals(result.getBuffer().sizeRecords(), successiveFailedRecords[i].length);
for(KinesisRecord remainingRecord : result.getBuffer()) {
int indexInOriginal = recordsInBuffer.indexOf(remainingRecord);
boolean found = Arrays.binarySearch(successiveFailedRecords[i], indexInOriginal) >= 0;
Assert.assertTrue(found);
}
}
// Since we exhausted the successiveFailedRecords, following call will return full success
BufferSendResult<KinesisRecord> result = sender.sendBuffer(testBuffer);
Assert.assertNotNull(result);
Assert.assertEquals(result.getStatus(), BufferSendResult.Status.SUCCESS);
Assert.assertEquals(result.getBuffer().id(), testBuffer.id());
Mockito.verify(kinesisClient, Mockito.times(successiveFailedRecords.length + 1)).putRecords(Mockito.any(PutRecordsRequest.class));
}