public void activateSubscription()

in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java [105:190]


    public void activateSubscription() {
        LOG.info(
                "Activating subscription to shard {} with starting position {} for consumer {}.",
                shardId,
                startingPosition,
                consumerArn);
        if (subscriptionActive.get()) {
            LOG.warn("Skipping activation of subscription since it is already active.");
            return;
        }

        // We have to use our own CountDownLatch to wait for subscription to be acquired because
        // subscription event is tracked via the handler.
        CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
        shardSubscriber = new FanOutShardSubscriber(waitForSubscriptionLatch);
        SubscribeToShardResponseHandler responseHandler =
                SubscribeToShardResponseHandler.builder()
                        .subscriber(() -> shardSubscriber)
                        .onError(
                                throwable -> {
                                    // Errors that occur when obtaining a subscription are thrown
                                    // here.
                                    // After subscription is acquired, these errors can be ignored.
                                    if (waitForSubscriptionLatch.getCount() > 0) {
                                        terminateSubscription(throwable);
                                        waitForSubscriptionLatch.countDown();
                                    }
                                })
                        .build();

        // We don't need to keep track of the future here because we monitor subscription success
        // using our own CountDownLatch
        kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler)
                .exceptionally(
                        throwable -> {
                            // If consumer exists and is still activating, we want to countdown.
                            if (ExceptionUtils.findThrowable(
                                            throwable, ResourceInUseException.class)
                                    .isPresent()) {
                                waitForSubscriptionLatch.countDown();
                                return null;
                            }
                            LOG.error(
                                    "Error subscribing to shard {} with starting position {} for consumer {}.",
                                    shardId,
                                    startingPosition,
                                    consumerArn,
                                    throwable);
                            terminateSubscription(throwable);
                            return null;
                        });

        // We have to handle timeout for subscriptions separately because Java 8 does not support a
        // fluent orTimeout() methods on CompletableFuture.
        CompletableFuture.runAsync(
                () -> {
                    try {
                        if (waitForSubscriptionLatch.await(
                                subscriptionTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                            LOG.info(
                                    "Successfully subscribed to shard {} with starting position {} for consumer {}.",
                                    shardId,
                                    startingPosition,
                                    consumerArn);
                            subscriptionActive.set(true);
                            // Request first batch of records.
                            shardSubscriber.requestRecords();
                        } else {
                            String errorMessage =
                                    "Timeout when subscribing to shard "
                                            + shardId
                                            + " with starting position "
                                            + startingPosition
                                            + " for consumer "
                                            + consumerArn
                                            + ".";
                            LOG.error(errorMessage);
                            terminateSubscription(new TimeoutException(errorMessage));
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for subscription to complete.", e);
                        terminateSubscription(e);
                        Thread.currentThread().interrupt();
                    }
                });
    }