in src/main/java/com/amazonaws/services/kinesis/scaling/StreamScalingUtils.java [138:170]
public static List<Shard> listShards(final KinesisClient kinesisClient, final String streamName,
final String shardIdStart) throws Exception {
if (shardIdStart != null) {
LOG.info(String.format("Listing Stream %s from Shard %s", streamName, shardIdStart));
} else {
LOG.info(String.format("Listing Stream %s", streamName));
}
KinesisOperation describe = new KinesisOperation() {
public Object run(KinesisClient client) {
ListShardsRequest.Builder builder = ListShardsRequest.builder().streamName(streamName);
boolean hasMoreResults = true;
List<Shard> shards = new ArrayList<>();
while (hasMoreResults) {
if (shardIdStart != null) {
builder.exclusiveStartShardId(shardIdStart);
}
ListShardsResponse result = client.listShards(builder.build());
shards.addAll(result.shards());
if (result.nextToken() == null) {
hasMoreResults = false;
} else {
builder.nextToken(result.nextToken());
}
}
return shards;
}
};
return (List<Shard>) doOperation(kinesisClient, describe, streamName, DESCRIBE_RETRIES, false);
}