in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java [288:334]
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet)
throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
GetRecordsResult getRecordsResult = null;
int retryCount = 0;
while (retryCount <= getRecordsMaxRetries && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
} catch (SdkClientException ex) {
if (isRecoverableSdkClientException(ex)) {
long backoffMillis =
BACKOFF.calculateFullJitterBackoff(
getRecordsBaseBackoffMillis,
getRecordsMaxBackoffMillis,
getRecordsExpConstant,
retryCount++);
LOG.warn(
"Got recoverable SdkClientException. Backing off for "
+ backoffMillis
+ " millis ("
+ ex.getClass().getName()
+ ": "
+ ex.getMessage()
+ ")");
BACKOFF.sleep(backoffMillis);
} else {
throw ex;
}
} catch (RuntimeException ex) {
LOG.error("Encountered non-recoverable error while invoking getRecords.", ex);
throw ex;
}
}
if (getRecordsResult == null) {
throw new RuntimeException(
"Retries exceeded for getRecords operation - all "
+ getRecordsMaxRetries
+ " retry attempts failed.");
}
return getRecordsResult;
}