in src/main/java/com/amazonaws/kda/flink/benchmarking/util/KinesisStreamUtil.java [35:69]
public static List<String> getHashKeysForOpenShards(AmazonKinesis kinesis, String streamName) {
String nextToken = null;
List<String> hashKeyList = Lists.newArrayList();
// prepare ListShardsRequest
ListShardsRequest listShardsRequest = new ListShardsRequest();
listShardsRequest.setStreamName(streamName);
// get shards
ListShardsResult listShardResult = kinesis.listShards(listShardsRequest);
List<Shard> shardList = listShardResult.getShards();
for (Shard s : shardList) {
if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) {
hashKeyList.add(s.getHashKeyRange().getStartingHashKey());
}
}
// get 'next token' from ListShardsResult and check its value.
// if it is not null, call listShards until you get a null.
// hint: paginating all shards.
nextToken = listShardResult.getNextToken();
if (Optional.ofNullable(nextToken).isPresent()) {
do {
// creating a new ListShardsRequest using next token alone.
listShardsRequest = new ListShardsRequest();
listShardsRequest.setNextToken(nextToken);
listShardResult = kinesis.listShards(listShardsRequest);
shardList = listShardResult.getShards();
for (Shard s : shardList) {
if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) {
hashKeyList.add(s.getHashKeyRange().getStartingHashKey());
}
}
nextToken = listShardResult.getNextToken();
} while (Optional.ofNullable(nextToken).isPresent());
}
return hashKeyList;
}