private FanOutShardSubscription openSubscriptionToShard()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/fanout/FanOutShardSubscriber.java [194:243]


	private FanOutShardSubscription openSubscriptionToShard(final StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException {
		SubscribeToShardRequest request = SubscribeToShardRequest.builder()
			.consumerARN(consumerArn)
			.shardId(shardId)
			.startingPosition(startingPosition)
			.build();

		AtomicReference<Throwable> exception = new AtomicReference<>();
		CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
		FanOutShardSubscription subscription = new FanOutShardSubscription(waitForSubscriptionLatch);

		SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
			.builder()
			.onError(e -> {
				// Errors that occur while trying to acquire a subscription are only thrown from here
				// Errors that occur during the subscription are surfaced here and to the FanOutShardSubscription
				//	(errors are ignored here once the subscription is open)
				if (waitForSubscriptionLatch.getCount() > 0) {
					exception.set(e);
					waitForSubscriptionLatch.countDown();
				}
			})
			.subscriber(() -> subscription)
			.build();

		kinesis.subscribeToShard(request, responseHandler);

		boolean subscriptionEstablished = waitForSubscriptionLatch
				.await(subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS);

		if (!subscriptionEstablished) {
			final String errorMessage = "Timed out acquiring subscription - " + shardId + " (" + consumerArn + ")";
			LOG.error(errorMessage);
			subscription.cancelSubscription();
			handleError(new RecoverableFanOutSubscriberException(new TimeoutException(errorMessage)));
		}

		Throwable throwable = exception.get();
		if (throwable != null) {
			handleError(throwable);
		}

		LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn);

		// Request the first record to kick off consumption
		// Following requests are made by the FanOutShardSubscription on the netty thread
		subscription.requestRecord();

		return subscription;
	}