in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsProxy.java [298:324]
private ShardGraphProcessingResult resolveInconsistenciesInShardGraph() {
DescribeStreamResult response;
final String warnMsg = String.format("Inconsistent shard graph state detected. "
+ "Fetched: %d shards. Closed leaves: %d shards", shardGraph.size(), shardGraph.closedLeafNodeCount());
LOG.warn(warnMsg);
if (LOG.isDebugEnabled()) {
final String debugMsg = String.format("Following leaf node shards are closed: %s",
String.join(", ", shardGraph.getAllClosedLeafNodeIds()));
LOG.debug(debugMsg);
}
String exclusiveStartShardId = shardGraph.getEarliestClosedLeafNodeId();
do {
response = getStreamInfo(exclusiveStartShardId);
if (response == null) {
return ShardGraphProcessingResult.STREAM_DISABLED;
} else {
shardGraph.addToClosedLeafNodes(response.getStreamDescription().getShards());
LOG.debug(String.format("Resolving inconsistencies in shard graph; total shard count: %d",
shardGraph.size()));
if (shardGraph.closedLeafNodeCount() == 0) {
return ShardGraphProcessingResult.RESOLVED_INCONSISTENCIES_AND_ABORTED;
}
exclusiveStartShardId = shardGraph.getLastFetchedShardId();
}
} while (response.getStreamDescription().isHasMoreShards());
return ShardGraphProcessingResult.FETCHED_ALL_AVAILABLE_SHARDS;
}