private ListShardsResult listShards()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/proxy/KinesisProxy.java [409:478]


	private ListShardsResult listShards(String streamName, @Nullable String startShardId,
																			@Nullable String startNextToken)
			throws InterruptedException {
		final ListShardsRequest listShardsRequest = new ListShardsRequest();
		if (startNextToken == null) {
			listShardsRequest.setExclusiveStartShardId(startShardId);
			listShardsRequest.setStreamName(streamName);
		} else {
			// Note the nextToken returned by AWS expires within 300 sec.
			listShardsRequest.setNextToken(startNextToken);
		}

		ListShardsResult listShardsResults = null;

		// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
		int retryCount = 0;
		// List Shards returns just the first 1000 shard entries. Make sure that all entries
		// are taken up.
		while (retryCount <= listShardsMaxRetries && listShardsResults == null) { // retry until we get a result
			try {

				listShardsResults = kinesisClient.listShards(listShardsRequest);
			} catch (LimitExceededException le) {
				long backoffMillis = BACKOFF.calculateFullJitterBackoff(
						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
									+ ". Backing off for " + backoffMillis + " millis.");
				BACKOFF.sleep(backoffMillis);
			} catch (ResourceInUseException reInUse) {
				if (LOG.isWarnEnabled()) {
					// List Shards will throw an exception if stream in not in active state. Return and re-use previous state available.
					LOG.info("The stream is currently not in active state. Reusing the older state "
							+ "for the time being");
					break;
				}
			} catch (ResourceNotFoundException reNotFound) {
				throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
			} catch (InvalidArgumentException inArg) {
				throw new RuntimeException("Invalid Arguments to listShards.", inArg);
			} catch (ExpiredNextTokenException expiredToken) {
				LOG.warn("List Shards has an expired token. Reusing the previous state.");
				break;
			} catch (SdkClientException ex) {
				if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
					long backoffMillis = BACKOFF.calculateFullJitterBackoff(
						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
					LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
						streamName, backoffMillis);
					BACKOFF.sleep(backoffMillis);
				} else {
					// propagate if retries exceeded or not recoverable
					// (otherwise would return null result and keep trying forever)
					throw ex;
				}
			}
		}

		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
		// the exclusive start shard id in the returned shards list; check if we need to remove
		// these erroneously returned shards.
		// Related issues:
		// 	https://github.com/mhart/kinesalite/pull/77
		// 	https://github.com/lyft/kinesalite/pull/4
		if (startShardId != null && listShardsResults != null) {
			List<Shard> shards = listShardsResults.getShards();
			shards.removeIf(shard -> StreamShardHandle.compareShardIds(shard.getShardId(), startShardId) <= 0);
		}

		return listShardsResults;
	}