protected DescribeStreamResult describeStream()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java [633:677]


    protected DescribeStreamResult describeStream(String streamName, @Nullable String startShardId)
            throws InterruptedException {
        final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(streamName);
        describeStreamRequest.setExclusiveStartShardId(startShardId);

        DescribeStreamResult describeStreamResult = null;

        // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
        int attemptCount = 0;
        while (describeStreamResult == null) { // retry until we get a result
            try {
                describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
            } catch (LimitExceededException le) {
                long backoffMillis =
                        BACKOFF.calculateFullJitterBackoff(
                                describeStreamBaseBackoffMillis,
                                describeStreamMaxBackoffMillis,
                                describeStreamExpConstant,
                                attemptCount++);
                LOG.warn(
                        String.format(
                                "Got LimitExceededException when describing stream %s. "
                                        + "Backing off for %d millis.",
                                streamName, backoffMillis));
                BACKOFF.sleep(backoffMillis);
            } catch (ResourceNotFoundException re) {
                throw new RuntimeException("Error while getting stream details", re);
            }
        }

        String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
        if (!(streamStatus.equals(StreamStatus.ACTIVE.toString())
                || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
            if (LOG.isWarnEnabled()) {
                LOG.warn(
                        String.format(
                                "The status of stream %s is %s ; result of the current "
                                        + "describeStream operation will not contain any shard information.",
                                streamName, streamStatus));
            }
        }

        return describeStreamResult;
    }