in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java [385:423]
private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest)
throws InterruptedException {
GetShardIteratorResult getShardIteratorResult = null;
int retryCount = 0;
while (retryCount <= getShardIteratorMaxRetries && getShardIteratorResult == null) {
try {
getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
long backoffMillis =
BACKOFF.calculateFullJitterBackoff(
getShardIteratorBaseBackoffMillis,
getShardIteratorMaxBackoffMillis,
getShardIteratorExpConstant,
retryCount++);
LOG.warn(
"Got recoverable AmazonServiceException. Backing off for "
+ backoffMillis
+ " millis ("
+ ex.getClass().getName()
+ ": "
+ ex.getMessage()
+ ")");
BACKOFF.sleep(backoffMillis);
} else {
throw ex;
}
}
}
if (getShardIteratorResult == null) {
throw new RuntimeException(
"Retries exceeded for getShardIterator operation - all "
+ getShardIteratorMaxRetries
+ " retry attempts failed.");
}
return getShardIteratorResult.getShardIterator();
}