public void testRetryingPartialFailures()

in tst/com/amazon/kinesis/streaming/agent/tailing/FirehoseSenderTest.java [141:182]


    public void testRetryingPartialFailures(final int recordCount, final int[][] successiveFailedRecords) {
        final RecordBuffer<FirehoseRecord> testBuffer = getTestBuffer(flow, recordCount);
        final AtomicInteger attempt = new AtomicInteger(0);
        for(int[] failedRecords : successiveFailedRecords)
            Arrays.sort(failedRecords);
        AmazonKinesisFirehose firehose = Mockito.mock(AmazonKinesisFirehose.class);
        Mockito.when(firehose.putRecordBatch(Mockito.any(PutRecordBatchRequest.class))).then(new Answer<PutRecordBatchResult>() {
            @Override
            public PutRecordBatchResult answer(InvocationOnMock invocation) throws Throwable {
                try {
                    // When successiveFailedRecords return assume no more failed records
                    int[] failedRecords = attempt.get() < successiveFailedRecords.length ? successiveFailedRecords[attempt.get()] : new int[]{};
                    return answerFirehosePutRecordBatch(invocation, testBuffer, failedRecords);
                } finally {
                    attempt.incrementAndGet();
                }
            }
        });
        context = Mockito.spy(context);
        Mockito.when(context.getFirehoseClient()).thenReturn(firehose);
        FirehoseSender sender = new FirehoseSender(context, flow);
        for(int i = 0; i < successiveFailedRecords.length; ++i) {
            Assert.assertTrue(successiveFailedRecords[i].length > 0, "Failed records array cannot be empty in this test!");
            List<FirehoseRecord> recordsInBuffer = Lists.newArrayList(testBuffer);
            BufferSendResult<FirehoseRecord> result = sender.sendBuffer(testBuffer);
            Assert.assertNotNull(result);
            Assert.assertEquals(result.getBuffer().id(), testBuffer.id());
            Assert.assertEquals(result.getStatus(), BufferSendResult.Status.PARTIAL_SUCCESS);
            Assert.assertEquals(result.getBuffer().sizeRecords(), successiveFailedRecords[i].length);
            for(FirehoseRecord remainingRecord : result.getBuffer()) {
                int indexInOriginal = recordsInBuffer.indexOf(remainingRecord);
                boolean found = Arrays.binarySearch(successiveFailedRecords[i], indexInOriginal) >= 0;
                Assert.assertTrue(found);
            }
        }
        // Since we exhausted the successiveFailedRecords, following call will return full success
        BufferSendResult<FirehoseRecord> result = sender.sendBuffer(testBuffer);
        Assert.assertNotNull(result);
        Assert.assertEquals(result.getStatus(), BufferSendResult.Status.SUCCESS);
        Assert.assertEquals(result.getBuffer().id(), testBuffer.id());
        Mockito.verify(firehose, Mockito.times(successiveFailedRecords.length + 1)).putRecordBatch(Mockito.any(PutRecordBatchRequest.class));
    }