in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java [633:677]
protected DescribeStreamResult describeStream(String streamName, @Nullable String startShardId)
throws InterruptedException {
final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
describeStreamRequest.setExclusiveStartShardId(startShardId);
DescribeStreamResult describeStreamResult = null;
// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
int attemptCount = 0;
while (describeStreamResult == null) { // retry until we get a result
try {
describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
} catch (LimitExceededException le) {
long backoffMillis =
BACKOFF.calculateFullJitterBackoff(
describeStreamBaseBackoffMillis,
describeStreamMaxBackoffMillis,
describeStreamExpConstant,
attemptCount++);
LOG.warn(
String.format(
"Got LimitExceededException when describing stream %s. "
+ "Backing off for %d millis.",
streamName, backoffMillis));
BACKOFF.sleep(backoffMillis);
} catch (ResourceNotFoundException re) {
throw new RuntimeException("Error while getting stream details", re);
}
}
String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
if (!(streamStatus.equals(StreamStatus.ACTIVE.toString())
|| streamStatus.equals(StreamStatus.UPDATING.toString()))) {
if (LOG.isWarnEnabled()) {
LOG.warn(
String.format(
"The status of stream %s is %s ; result of the current "
+ "describeStream operation will not contain any shard information.",
streamName, streamStatus));
}
}
return describeStreamResult;
}