in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java [188:251]
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(LimitExceededException.class, t -> t);
exceptionManager.add(ResourceInUseException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);
ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder();
if (StringUtils.isEmpty(nextToken)) {
builder = builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter);
} else {
builder = builder.nextToken(nextToken);
}
final ListShardsRequest request = builder.build();
log.info("Stream {}: listing shards with list shards request {}", streamIdentifier, request);
ListShardsResponse result = null;
LimitExceededException lastException = null;
int remainingRetries = maxListShardsRetryAttempts;
while (result == null) {
try {
try {
result = getListShardsResponse(request);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: check if this is the correct behavior for Interrupted Exception
log.debug("Interrupted exception caught, shutdown initiated, returning null");
return null;
}
} catch (ResourceInUseException e) {
log.info("Stream is not in Active/Updating status, returning null (wait until stream is in"
+ " Active or Updating)");
return null;
} catch (LimitExceededException e) {
log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", streamIdentifier,
listShardsBackoffTimeInMillis);
try {
Thread.sleep(listShardsBackoffTimeInMillis);
} catch (InterruptedException ie) {
log.debug("Stream {} : Sleep was interrupted ", streamIdentifier, ie);
}
lastException = e;
} catch (ResourceNotFoundException e) {
log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.",
streamIdentifier.streamName());
return ListShardsResponse.builder().shards(Collections.emptyList())
.nextToken(null)
.build();
} catch (TimeoutException te) {
throw new RuntimeException(te);
}
remainingRetries--;
if (remainingRetries <= 0 && result == null) {
if (lastException != null) {
throw lastException;
}
throw new IllegalStateException("Received null from ListShards call.");
}
}
return result;
}