in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/proxy/KinesisProxy.java [409:478]
private ListShardsResult listShards(String streamName, @Nullable String startShardId,
@Nullable String startNextToken)
throws InterruptedException {
final ListShardsRequest listShardsRequest = new ListShardsRequest();
if (startNextToken == null) {
listShardsRequest.setExclusiveStartShardId(startShardId);
listShardsRequest.setStreamName(streamName);
} else {
// Note the nextToken returned by AWS expires within 300 sec.
listShardsRequest.setNextToken(startNextToken);
}
ListShardsResult listShardsResults = null;
// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
int retryCount = 0;
// List Shards returns just the first 1000 shard entries. Make sure that all entries
// are taken up.
while (retryCount <= listShardsMaxRetries && listShardsResults == null) { // retry until we get a result
try {
listShardsResults = kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = BACKOFF.calculateFullJitterBackoff(
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
+ ". Backing off for " + backoffMillis + " millis.");
BACKOFF.sleep(backoffMillis);
} catch (ResourceInUseException reInUse) {
if (LOG.isWarnEnabled()) {
// List Shards will throw an exception if stream in not in active state. Return and re-use previous state available.
LOG.info("The stream is currently not in active state. Reusing the older state "
+ "for the time being");
break;
}
} catch (ResourceNotFoundException reNotFound) {
throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
} catch (InvalidArgumentException inArg) {
throw new RuntimeException("Invalid Arguments to listShards.", inArg);
} catch (ExpiredNextTokenException expiredToken) {
LOG.warn("List Shards has an expired token. Reusing the previous state.");
break;
} catch (SdkClientException ex) {
if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
long backoffMillis = BACKOFF.calculateFullJitterBackoff(
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
streamName, backoffMillis);
BACKOFF.sleep(backoffMillis);
} else {
// propagate if retries exceeded or not recoverable
// (otherwise would return null result and keep trying forever)
throw ex;
}
}
}
// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
// the exclusive start shard id in the returned shards list; check if we need to remove
// these erroneously returned shards.
// Related issues:
// https://github.com/mhart/kinesalite/pull/77
// https://github.com/lyft/kinesalite/pull/4
if (startShardId != null && listShardsResults != null) {
List<Shard> shards = listShardsResults.getShards();
shards.removeIf(shard -> StreamShardHandle.compareShardIds(shard.getShardId(), startShardId) <= 0);
}
return listShardsResults;
}