in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/StreamConsumerRegistrar.java [184:216]
private String waitForConsumerToBecomeActive(
@Nullable final DescribeStreamConsumerResponse describeStreamConsumerResponse,
final String streamArn,
final String streamConsumerName,
final int initialAttempt)
throws InterruptedException, ExecutionException {
int attempt = initialAttempt;
Instant start = Instant.now();
Duration timeout = configuration.getRegisterStreamConsumerTimeout();
DescribeStreamConsumerResponse response = describeStreamConsumerResponse;
while (response == null || response.consumerDescription().consumerStatus() != ACTIVE) {
LOG.debug(
"Waiting for stream consumer to become active, attempt {} - {} on {}",
attempt,
streamConsumerName,
streamArn);
registrationBackoff(configuration, backoff, attempt++);
response =
kinesisProxyV2Interface.describeStreamConsumer(streamArn, streamConsumerName);
if (Duration.between(start, Instant.now()).compareTo(timeout) > 0) {
throw new FlinkKinesisTimeoutException(
"Timeout waiting for stream consumer to become active: "
+ streamConsumerName
+ " on "
+ streamArn);
}
}
return response.consumerDescription().consumerARN();
}