in kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/utils/WatermarkTracker.java [109:133]
private void refreshShards() {
try {
String nextToken = "";
List<Shard> shards = new ArrayList<>();
do {
final ListShardsRequest request = new ListShardsRequest();
if (StringUtils.isEmpty(nextToken)) {
request.setStreamName(streamName);
} else {
request.setNextToken(nextToken);
}
ListShardsResult result = kinesisClient.listShards(request);
shards.addAll(result.getShards());
nextToken = result.getNextToken();
} while (!StringUtils.isEmpty(nextToken));
this.shards = shards;
} catch (LimitExceededException | ResourceInUseException e) {
//if the request is throttled, just wait for the next invocation and use cached shard description in the meantime
LOG.debug("skipping watermark due to limit exceeded/resource in use exception");
}
}