protected DescribeStreamResult describeStream()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/proxy/KinesisProxy.java [495:533]


	protected DescribeStreamResult describeStream(String streamName, @Nullable String startShardId)
			throws InterruptedException {
		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
		describeStreamRequest.setStreamName(streamName);
		describeStreamRequest.setExclusiveStartShardId(startShardId);

		DescribeStreamResult describeStreamResult = null;

		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
		int attemptCount = 0;
		while (describeStreamResult == null) { // retry until we get a result
			try {
				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
			} catch (LimitExceededException le) {
				long backoffMillis = BACKOFF.calculateFullJitterBackoff(
						describeStreamBaseBackoffMillis,
						describeStreamMaxBackoffMillis,
						describeStreamExpConstant,
						attemptCount++);
				LOG.warn(String.format("Got LimitExceededException when describing stream %s. "
						+ "Backing off for %d millis.", streamName, backoffMillis));
				BACKOFF.sleep(backoffMillis);
			} catch (ResourceNotFoundException re) {
				throw new RuntimeException("Error while getting stream details", re);
			}
		}

		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString())
				|| streamStatus.equals(StreamStatus.UPDATING.toString()))) {
			if (LOG.isWarnEnabled()) {
				LOG.warn(String.format("The status of stream %s is %s ; result of the current "
								+ "describeStream operation will not contain any shard information.",
						streamName, streamStatus));
			}
		}

		return describeStreamResult;
	}