in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsProxy.java [211:260]
public synchronized List<Shard> getShardList() {
if (shardGraph == null) {
shardGraph = new ShardGraph();
}
// ShardGraph may not be empty if this call is being made after DescribeStream throttling.
// In that case, the graph will have a lot of closed leaf nodes since their descendants were not
// discovered earlier due to throttling. We do not handle that explicitly and allow the next round of
// inconsistency fix attempts to resolve it.
if (buildShardGraphSnapshot() == ShardGraphProcessingResult.STREAM_DISABLED) {
LOG.info("Stream was disabled during getShardList operation.");
return null;
}
if (shardGraph.size() < MAX_SHARD_COUNT_TO_TRIGGER_RETRIES) {
int retryAttempt = 0;
while (shardGraph.closedLeafNodeCount() > 0 && retryAttempt < maxRetriesToResolveInconsistencies) {
final long backOffTime = getInconsistencyBackoffTimeInMillis(retryAttempt);
String infoMsg = String.format("Inconsistency resolution retry attempt: %d. Backing off for %d millis.",
retryAttempt, backOffTime);
LOG.info(infoMsg);
sleeper.sleep(backOffTime);
ShardGraphProcessingResult shardGraphProcessingResult = resolveInconsistenciesInShardGraph();
if (shardGraphProcessingResult.equals(ShardGraphProcessingResult.STREAM_DISABLED)) {
LOG.info("Stream was disabled during getShardList operation.");
return null;
} else if (shardGraphProcessingResult.equals(ShardGraphProcessingResult.RESOLVED_INCONSISTENCIES_AND_ABORTED)) {
infoMsg = String.format("An intermediate page in DescribeStream response resolved inconsistencies. "
+ "Total retry attempts taken to resolve inconsistencies: %d", retryAttempt + 1);
LOG.info(infoMsg);
break;
}
retryAttempt++;
}
if (retryAttempt == maxRetriesToResolveInconsistencies && shardGraph.closedLeafNodeCount() > 0) {
LOG.warn("Inconsistencies in the shard graph were not resolved after exhausting all retries.");
}
} else {
if (shardGraph.closedLeafNodeCount() > 0) {
String msg = String.format("Returning shard list with %s closed leaf node shards.",
shardGraph.closedLeafNodeCount());
LOG.debug(msg);
}
}
this.listOfShardsSinceLastGet.set(shardGraph.getShards());
this.shardGraph = new ShardGraph();
return listOfShardsSinceLastGet.get();
}