public static List getHashKeysForOpenShards()

in src/main/java/com/amazonaws/kda/flink/benchmarking/util/KinesisStreamUtil.java [35:69]


	public static List<String> getHashKeysForOpenShards(AmazonKinesis kinesis, String streamName) {
		String nextToken = null;
		List<String> hashKeyList = Lists.newArrayList();
		// prepare ListShardsRequest
		ListShardsRequest listShardsRequest = new ListShardsRequest();
		listShardsRequest.setStreamName(streamName);
		// get shards
		ListShardsResult listShardResult = kinesis.listShards(listShardsRequest);
		List<Shard> shardList = listShardResult.getShards();
		for (Shard s : shardList) {
			if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) {
				hashKeyList.add(s.getHashKeyRange().getStartingHashKey());
			}
		}
		// get 'next token' from ListShardsResult and check its value. 
		// if it is not null, call listShards until you get a null.
		// hint: paginating all shards.
		nextToken = listShardResult.getNextToken();
		if (Optional.ofNullable(nextToken).isPresent()) {
			do {
				// creating a new ListShardsRequest using next token alone.
				listShardsRequest = new ListShardsRequest();
				listShardsRequest.setNextToken(nextToken);
				listShardResult = kinesis.listShards(listShardsRequest);
				shardList = listShardResult.getShards();
				for (Shard s : shardList) {
					if (s.getSequenceNumberRange().getEndingSequenceNumber() == null) {
						hashKeyList.add(s.getHashKeyRange().getStartingHashKey());
					}
				}
				nextToken = listShardResult.getNextToken();
			} while (Optional.ofNullable(nextToken).isPresent());
		}
		return hashKeyList;
	}