private ListShardsResponse listShards()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java [188:251]


    private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
        final AWSExceptionManager exceptionManager = new AWSExceptionManager();
        exceptionManager.add(ResourceNotFoundException.class, t -> t);
        exceptionManager.add(LimitExceededException.class, t -> t);
        exceptionManager.add(ResourceInUseException.class, t -> t);
        exceptionManager.add(KinesisException.class, t -> t);

        ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder();
        if (StringUtils.isEmpty(nextToken)) {
            builder = builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter);
        } else {
            builder = builder.nextToken(nextToken);
        }

        final ListShardsRequest request = builder.build();
        log.info("Stream {}: listing shards with list shards request {}", streamIdentifier, request);

        ListShardsResponse result = null;
        LimitExceededException lastException = null;
        int remainingRetries = maxListShardsRetryAttempts;

        while (result == null) {
            try {
                try {
                    result = getListShardsResponse(request);
                } catch (ExecutionException e) {
                    throw exceptionManager.apply(e.getCause());
                } catch (InterruptedException e) {
                    // TODO: check if this is the correct behavior for Interrupted Exception
                    log.debug("Interrupted exception caught, shutdown initiated, returning null");
                    return null;
                }
            } catch (ResourceInUseException e) {
                log.info("Stream is not in Active/Updating status, returning null (wait until stream is in"
                        + " Active or Updating)");
                return null;
            } catch (LimitExceededException e) {
                log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", streamIdentifier,
                        listShardsBackoffTimeInMillis);
                try {
                    Thread.sleep(listShardsBackoffTimeInMillis);
                } catch (InterruptedException ie) {
                    log.debug("Stream {} : Sleep  was interrupted ", streamIdentifier, ie);
                }
                lastException = e;
            } catch (ResourceNotFoundException e) {
                log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.",
                        streamIdentifier.streamName());
                return ListShardsResponse.builder().shards(Collections.emptyList())
                        .nextToken(null)
                        .build();
            } catch (TimeoutException te) {
                throw new RuntimeException(te);
            }
            remainingRetries--;
            if (remainingRetries <= 0 && result == null) {
                if (lastException != null) {
                    throw lastException;
                }
                throw new IllegalStateException("Received null from ListShards call.");
            }
        }
        return result;
    }