private String waitForConsumerToBecomeActive()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/StreamConsumerRegistrar.java [184:216]


    private String waitForConsumerToBecomeActive(
            @Nullable final DescribeStreamConsumerResponse describeStreamConsumerResponse,
            final String streamArn,
            final String streamConsumerName,
            final int initialAttempt)
            throws InterruptedException, ExecutionException {
        int attempt = initialAttempt;

        Instant start = Instant.now();
        Duration timeout = configuration.getRegisterStreamConsumerTimeout();

        DescribeStreamConsumerResponse response = describeStreamConsumerResponse;
        while (response == null || response.consumerDescription().consumerStatus() != ACTIVE) {
            LOG.debug(
                    "Waiting for stream consumer to become active, attempt {} - {} on {}",
                    attempt,
                    streamConsumerName,
                    streamArn);
            registrationBackoff(configuration, backoff, attempt++);
            response =
                    kinesisProxyV2Interface.describeStreamConsumer(streamArn, streamConsumerName);

            if (Duration.between(start, Instant.now()).compareTo(timeout) > 0) {
                throw new FlinkKinesisTimeoutException(
                        "Timeout waiting for stream consumer to become active: "
                                + streamConsumerName
                                + " on "
                                + streamArn);
            }
        }

        return response.consumerDescription().consumerARN();
    }