in src/main/java/com/amazonaws/services/kinesis/aggregators/StreamAggregatorUtils.java [311:380]
public static Map<String, Shard> getOpenShards(
AmazonKinesisClient kinesisClient, String streamName)
throws Exception {
Map<String, Shard> shardMap = new LinkedHashMap<>();
final int BACKOFF_MILLIS = 10;
final int MAX_DESCRIBE_ATTEMPTS = 10;
int describeAttempts = 0;
StreamDescription stream = null;
try {
do {
try {
stream = kinesisClient.describeStream(streamName)
.getStreamDescription();
} catch (LimitExceededException e) {
Thread.sleep(2 ^ describeAttempts * BACKOFF_MILLIS);
describeAttempts++;
}
} while (stream == null && describeAttempts < MAX_DESCRIBE_ATTEMPTS);
} catch (InterruptedException e) {
LOG.error(e);
throw e;
}
if (stream == null) {
throw new Exception(String.format(
"Unable to describe Stream after %s attempts",
MAX_DESCRIBE_ATTEMPTS));
}
Collection<String> openShardNames = new ArrayList<String>();
// load all the shards on the stream
for (Shard shard : stream.getShards()) {
openShardNames.add(shard.getShardId());
shardMap.put(shard.getShardId(), shard);
// remove this shard's parents from the set of active shards -
// we
// can't do anything to them
if (shard.getParentShardId() != null) {
openShardNames.remove(shard.getParentShardId());
}
if (shard.getAdjacentParentShardId() != null) {
openShardNames.remove(shard.getAdjacentParentShardId());
}
}
// create a List of Open shards for sorting
List<Shard> shards = new ArrayList<Shard>();
for (String s : openShardNames) {
shards.add(shardMap.get(s));
}
// sort the list into lowest start hash order
Collections.sort(shards, new Comparator<Shard>() {
public int compare(Shard o1, Shard o2) {
return new BigInteger(o1.getHashKeyRange().getStartingHashKey())
.compareTo(new BigInteger(o2.getHashKeyRange()
.getStartingHashKey()));
}
});
// rebuild the shard map into the correct order
shardMap.clear();
for (Shard s : shards) {
shardMap.put(s.getShardId(), s);
}
return shardMap;
}