in tst/com/amazon/kinesis/streaming/agent/tailing/FirehoseSenderTest.java [141:182]
public void testRetryingPartialFailures(final int recordCount, final int[][] successiveFailedRecords) {
final RecordBuffer<FirehoseRecord> testBuffer = getTestBuffer(flow, recordCount);
final AtomicInteger attempt = new AtomicInteger(0);
for(int[] failedRecords : successiveFailedRecords)
Arrays.sort(failedRecords);
AmazonKinesisFirehose firehose = Mockito.mock(AmazonKinesisFirehose.class);
Mockito.when(firehose.putRecordBatch(Mockito.any(PutRecordBatchRequest.class))).then(new Answer<PutRecordBatchResult>() {
@Override
public PutRecordBatchResult answer(InvocationOnMock invocation) throws Throwable {
try {
// When successiveFailedRecords return assume no more failed records
int[] failedRecords = attempt.get() < successiveFailedRecords.length ? successiveFailedRecords[attempt.get()] : new int[]{};
return answerFirehosePutRecordBatch(invocation, testBuffer, failedRecords);
} finally {
attempt.incrementAndGet();
}
}
});
context = Mockito.spy(context);
Mockito.when(context.getFirehoseClient()).thenReturn(firehose);
FirehoseSender sender = new FirehoseSender(context, flow);
for(int i = 0; i < successiveFailedRecords.length; ++i) {
Assert.assertTrue(successiveFailedRecords[i].length > 0, "Failed records array cannot be empty in this test!");
List<FirehoseRecord> recordsInBuffer = Lists.newArrayList(testBuffer);
BufferSendResult<FirehoseRecord> result = sender.sendBuffer(testBuffer);
Assert.assertNotNull(result);
Assert.assertEquals(result.getBuffer().id(), testBuffer.id());
Assert.assertEquals(result.getStatus(), BufferSendResult.Status.PARTIAL_SUCCESS);
Assert.assertEquals(result.getBuffer().sizeRecords(), successiveFailedRecords[i].length);
for(FirehoseRecord remainingRecord : result.getBuffer()) {
int indexInOriginal = recordsInBuffer.indexOf(remainingRecord);
boolean found = Arrays.binarySearch(successiveFailedRecords[i], indexInOriginal) >= 0;
Assert.assertTrue(found);
}
}
// Since we exhausted the successiveFailedRecords, following call will return full success
BufferSendResult<FirehoseRecord> result = sender.sendBuffer(testBuffer);
Assert.assertNotNull(result);
Assert.assertEquals(result.getStatus(), BufferSendResult.Status.SUCCESS);
Assert.assertEquals(result.getBuffer().id(), testBuffer.id());
Mockito.verify(firehose, Mockito.times(successiveFailedRecords.length + 1)).putRecordBatch(Mockito.any(PutRecordBatchRequest.class));
}