private ShardGraphProcessingResult resolveInconsistenciesInShardGraph()

in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsProxy.java [298:324]


    private ShardGraphProcessingResult resolveInconsistenciesInShardGraph() {
        DescribeStreamResult response;
        final String warnMsg = String.format("Inconsistent shard graph state detected. "
            + "Fetched: %d shards. Closed leaves: %d shards", shardGraph.size(), shardGraph.closedLeafNodeCount());
        LOG.warn(warnMsg);
        if (LOG.isDebugEnabled()) {
            final String debugMsg = String.format("Following leaf node shards are closed: %s",
                String.join(", ", shardGraph.getAllClosedLeafNodeIds()));
            LOG.debug(debugMsg);
        }
        String exclusiveStartShardId = shardGraph.getEarliestClosedLeafNodeId();
        do {
            response = getStreamInfo(exclusiveStartShardId);
            if (response == null) {
                return ShardGraphProcessingResult.STREAM_DISABLED;
            } else {
                shardGraph.addToClosedLeafNodes(response.getStreamDescription().getShards());
                LOG.debug(String.format("Resolving inconsistencies in shard graph; total shard count: %d",
                    shardGraph.size()));
                if (shardGraph.closedLeafNodeCount() == 0) {
                    return ShardGraphProcessingResult.RESOLVED_INCONSISTENCIES_AND_ABORTED;
                }
                exclusiveStartShardId = shardGraph.getLastFetchedShardId();
            }
        } while (response.getStreamDescription().isHasMoreShards());
        return ShardGraphProcessingResult.FETCHED_ALL_AVAILABLE_SHARDS;
    }