in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java [523:617]
private ListShardsResult listShards(
String streamName,
String streamArn,
@Nullable String startShardId,
@Nullable String startNextToken)
throws InterruptedException {
final ListShardsRequest listShardsRequest = new ListShardsRequest();
if (startNextToken == null) {
listShardsRequest.setExclusiveStartShardId(startShardId);
listShardsRequest.setStreamName(streamName);
listShardsRequest.setStreamARN(streamArn);
} else {
// Note the nextToken returned by AWS expires within 300 sec.
listShardsRequest.setNextToken(startNextToken);
}
ListShardsResult listShardsResults = null;
// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
int retryCount = 0;
// List Shards returns just the first 1000 shard entries. Make sure that all entries
// are taken up.
while (retryCount <= listShardsMaxRetries
&& listShardsResults == null) { // retry until we get a result
try {
listShardsResults = kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis =
BACKOFF.calculateFullJitterBackoff(
listShardsBaseBackoffMillis,
listShardsMaxBackoffMillis,
listShardsExpConstant,
retryCount++);
LOG.warn(
"Got LimitExceededException when listing shards from stream "
+ streamArn
+ ". Backing off for "
+ backoffMillis
+ " millis.");
BACKOFF.sleep(backoffMillis);
} catch (ResourceInUseException reInUse) {
if (LOG.isWarnEnabled()) {
// List Shards will throw an exception if stream in not in active state. Return
// and re-use previous state available.
LOG.info(
"The stream is currently not in active state. Reusing the older state "
+ "for the time being");
break;
}
} catch (ResourceNotFoundException reNotFound) {
throw new RuntimeException(
"Stream not found. Error while getting shard list.", reNotFound);
} catch (InvalidArgumentException inArg) {
throw new RuntimeException("Invalid Arguments to listShards.", inArg);
} catch (ExpiredNextTokenException expiredToken) {
LOG.warn("List Shards has an expired token. Reusing the previous state.");
break;
} catch (SdkClientException ex) {
if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
long backoffMillis =
BACKOFF.calculateFullJitterBackoff(
listShardsBaseBackoffMillis,
listShardsMaxBackoffMillis,
listShardsExpConstant,
retryCount++);
LOG.warn(
"Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
streamArn,
backoffMillis);
BACKOFF.sleep(backoffMillis);
} else {
// propagate if retries exceeded or not recoverable
// (otherwise would return null result and keep trying forever)
throw ex;
}
}
}
// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
// the exclusive start shard id in the returned shards list; check if we need to remove
// these erroneously returned shards.
// Related issues:
// https://github.com/mhart/kinesalite/pull/77
// https://github.com/lyft/kinesalite/pull/4
if (startShardId != null && listShardsResults != null) {
List<Shard> shards = listShardsResults.getShards();
shards.removeIf(
shard ->
StreamShardHandle.compareShardIds(shard.getShardId(), startShardId)
<= 0);
}
return listShardsResults;
}