in src/main/java/com/amazonaws/services/kinesis/scaling/StreamScalingUtils.java [224:255]
private static Object doOperation(KinesisClient kinesisClient, KinesisOperation operation, String streamName,
int retries, boolean waitForActive) throws Exception {
boolean done = false;
int attempts = 0;
Object result = null;
do {
attempts++;
try {
result = operation.run(kinesisClient);
if (waitForActive) {
waitForStreamStatus(kinesisClient, streamName, "ACTIVE");
}
done = true;
} catch (ResourceInUseException e) {
// thrown when the Shard is mutating - wait until we are able to
// do the modification or ResourceNotFoundException is thrown
Thread.sleep(1000);
} catch (LimitExceededException lee) {
// API Throttling
LOG.warn(String.format("LimitExceededException for Stream %s", streamName));
Thread.sleep(getTimeoutDuration(attempts));
}
} while (!done && attempts < retries);
if (!done) {
throw new Exception(String.format("Unable to Complete Kinesis Operation after %s Retries", retries));
} else {
return result;
}
}