in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java [105:190]
public void activateSubscription() {
LOG.info(
"Activating subscription to shard {} with starting position {} for consumer {}.",
shardId,
startingPosition,
consumerArn);
if (subscriptionActive.get()) {
LOG.warn("Skipping activation of subscription since it is already active.");
return;
}
// We have to use our own CountDownLatch to wait for subscription to be acquired because
// subscription event is tracked via the handler.
CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
shardSubscriber = new FanOutShardSubscriber(waitForSubscriptionLatch);
SubscribeToShardResponseHandler responseHandler =
SubscribeToShardResponseHandler.builder()
.subscriber(() -> shardSubscriber)
.onError(
throwable -> {
// Errors that occur when obtaining a subscription are thrown
// here.
// After subscription is acquired, these errors can be ignored.
if (waitForSubscriptionLatch.getCount() > 0) {
terminateSubscription(throwable);
waitForSubscriptionLatch.countDown();
}
})
.build();
// We don't need to keep track of the future here because we monitor subscription success
// using our own CountDownLatch
kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler)
.exceptionally(
throwable -> {
// If consumer exists and is still activating, we want to countdown.
if (ExceptionUtils.findThrowable(
throwable, ResourceInUseException.class)
.isPresent()) {
waitForSubscriptionLatch.countDown();
return null;
}
LOG.error(
"Error subscribing to shard {} with starting position {} for consumer {}.",
shardId,
startingPosition,
consumerArn,
throwable);
terminateSubscription(throwable);
return null;
});
// We have to handle timeout for subscriptions separately because Java 8 does not support a
// fluent orTimeout() methods on CompletableFuture.
CompletableFuture.runAsync(
() -> {
try {
if (waitForSubscriptionLatch.await(
subscriptionTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
LOG.info(
"Successfully subscribed to shard {} with starting position {} for consumer {}.",
shardId,
startingPosition,
consumerArn);
subscriptionActive.set(true);
// Request first batch of records.
shardSubscriber.requestRecords();
} else {
String errorMessage =
"Timeout when subscribing to shard "
+ shardId
+ " with starting position "
+ startingPosition
+ " for consumer "
+ consumerArn
+ ".";
LOG.error(errorMessage);
terminateSubscription(new TimeoutException(errorMessage));
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for subscription to complete.", e);
terminateSubscription(e);
Thread.currentThread().interrupt();
}
});
}