public void testRetryingPartialFailures()

in tst/com/amazon/kinesis/streaming/agent/tailing/KinesisSenderTest.java [137:178]


    public void testRetryingPartialFailures(final int recordCount, final int[][] successiveFailedRecords) {
        final RecordBuffer<KinesisRecord> testBuffer = getTestKinesisBuffer(flow, recordCount);
        final AtomicInteger attempt = new AtomicInteger(0);
        for(int[] failedRecords : successiveFailedRecords)
            Arrays.sort(failedRecords);
        AmazonKinesisClient kinesisClient = Mockito.mock(AmazonKinesisClient.class);
        Mockito.when(kinesisClient.putRecords(Mockito.any(PutRecordsRequest.class))).then(new Answer<PutRecordsResult>() {
            @Override
            public PutRecordsResult answer(InvocationOnMock invocation) throws Throwable {
                try {
                    // When successiveFailedRecords return assume no more failed records
                    int[] failedRecords = attempt.get() < successiveFailedRecords.length ? successiveFailedRecords[attempt.get()] : new int[]{};
                    return answerKinesisPutRecords(invocation, testBuffer, failedRecords);
                } finally {
                    attempt.incrementAndGet();
                }
            }
        });
        context = Mockito.spy(context);
        Mockito.when(context.getKinesisClient()).thenReturn(kinesisClient);
        KinesisSender sender = new KinesisSender(context, flow);
        for(int i = 0; i < successiveFailedRecords.length; ++i) {
            Assert.assertTrue(successiveFailedRecords[i].length > 0, "Failed records array cannot be empty in this test!");
            List<KinesisRecord> recordsInBuffer = Lists.newArrayList(testBuffer);
            BufferSendResult<KinesisRecord> result = sender.sendBuffer(testBuffer);
            Assert.assertNotNull(result);
            Assert.assertEquals(result.getBuffer().id(), testBuffer.id());
            Assert.assertEquals(result.getStatus(), BufferSendResult.Status.PARTIAL_SUCCESS);
            Assert.assertEquals(result.getBuffer().sizeRecords(), successiveFailedRecords[i].length);
            for(KinesisRecord remainingRecord : result.getBuffer()) {
                int indexInOriginal = recordsInBuffer.indexOf(remainingRecord);
                boolean found = Arrays.binarySearch(successiveFailedRecords[i], indexInOriginal) >= 0;
                Assert.assertTrue(found);
            }
        }
        // Since we exhausted the successiveFailedRecords, following call will return full success
        BufferSendResult<KinesisRecord> result = sender.sendBuffer(testBuffer);
        Assert.assertNotNull(result);
        Assert.assertEquals(result.getStatus(), BufferSendResult.Status.SUCCESS);
        Assert.assertEquals(result.getBuffer().id(), testBuffer.id());
        Mockito.verify(kinesisClient, Mockito.times(successiveFailedRecords.length + 1)).putRecords(Mockito.any(PutRecordsRequest.class));
    }