private ListShardsResult listShards()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java [523:617]


    private ListShardsResult listShards(
            String streamName,
            String streamArn,
            @Nullable String startShardId,
            @Nullable String startNextToken)
            throws InterruptedException {
        final ListShardsRequest listShardsRequest = new ListShardsRequest();
        if (startNextToken == null) {
            listShardsRequest.setExclusiveStartShardId(startShardId);
            listShardsRequest.setStreamName(streamName);
            listShardsRequest.setStreamARN(streamArn);
        } else {
            // Note the nextToken returned by AWS expires within 300 sec.
            listShardsRequest.setNextToken(startNextToken);
        }

        ListShardsResult listShardsResults = null;

        // Call ListShards, with full-jitter backoff (if we get LimitExceededException).
        int retryCount = 0;
        // List Shards returns just the first 1000 shard entries. Make sure that all entries
        // are taken up.
        while (retryCount <= listShardsMaxRetries
                && listShardsResults == null) { // retry until we get a result
            try {

                listShardsResults = kinesisClient.listShards(listShardsRequest);
            } catch (LimitExceededException le) {
                long backoffMillis =
                        BACKOFF.calculateFullJitterBackoff(
                                listShardsBaseBackoffMillis,
                                listShardsMaxBackoffMillis,
                                listShardsExpConstant,
                                retryCount++);
                LOG.warn(
                        "Got LimitExceededException when listing shards from stream "
                                + streamArn
                                + ". Backing off for "
                                + backoffMillis
                                + " millis.");
                BACKOFF.sleep(backoffMillis);
            } catch (ResourceInUseException reInUse) {
                if (LOG.isWarnEnabled()) {
                    // List Shards will throw an exception if stream in not in active state. Return
                    // and re-use previous state available.
                    LOG.info(
                            "The stream is currently not in active state. Reusing the older state "
                                    + "for the time being");
                    break;
                }
            } catch (ResourceNotFoundException reNotFound) {
                throw new RuntimeException(
                        "Stream not found. Error while getting shard list.", reNotFound);
            } catch (InvalidArgumentException inArg) {
                throw new RuntimeException("Invalid Arguments to listShards.", inArg);
            } catch (ExpiredNextTokenException expiredToken) {
                LOG.warn("List Shards has an expired token. Reusing the previous state.");
                break;
            } catch (SdkClientException ex) {
                if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
                    long backoffMillis =
                            BACKOFF.calculateFullJitterBackoff(
                                    listShardsBaseBackoffMillis,
                                    listShardsMaxBackoffMillis,
                                    listShardsExpConstant,
                                    retryCount++);
                    LOG.warn(
                            "Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
                            streamArn,
                            backoffMillis);
                    BACKOFF.sleep(backoffMillis);
                } else {
                    // propagate if retries exceeded or not recoverable
                    // (otherwise would return null result and keep trying forever)
                    throw ex;
                }
            }
        }

        // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
        // the exclusive start shard id in the returned shards list; check if we need to remove
        // these erroneously returned shards.
        // Related issues:
        // 	https://github.com/mhart/kinesalite/pull/77
        // 	https://github.com/lyft/kinesalite/pull/4
        if (startShardId != null && listShardsResults != null) {
            List<Shard> shards = listShardsResults.getShards();
            shards.removeIf(
                    shard ->
                            StreamShardHandle.compareShardIds(shard.getShardId(), startShardId)
                                    <= 0);
        }

        return listShardsResults;
    }