public String registerStreamConsumer()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/StreamConsumerRegistrar.java [81:117]


    public String registerStreamConsumer(final String stream, final String streamConsumerName)
            throws Exception {
        LOG.debug("Registering stream consumer - {}::{}", stream, streamConsumerName);

        int attempt = 1;

        if (configuration.getEfoRegistrationType() == LAZY) {
            registrationBackoff(configuration, backoff, attempt++);
        }

        DescribeStreamSummaryResponse describeStreamSummaryResponse =
                kinesisProxyV2Interface.describeStreamSummary(stream);
        String streamArn = describeStreamSummaryResponse.streamDescriptionSummary().streamARN();

        LOG.debug("Found stream ARN - {}", streamArn);

        Optional<DescribeStreamConsumerResponse> describeStreamConsumerResponse =
                describeStreamConsumer(streamArn, streamConsumerName);

        if (!describeStreamConsumerResponse.isPresent()) {
            invokeIgnoringResourceInUse(
                    () ->
                            kinesisProxyV2Interface.registerStreamConsumer(
                                    streamArn, streamConsumerName));
        }

        String streamConsumerArn =
                waitForConsumerToBecomeActive(
                        describeStreamConsumerResponse.orElse(null),
                        streamArn,
                        streamConsumerName,
                        attempt);

        LOG.debug("Using stream consumer - {}", streamConsumerArn);

        return streamConsumerArn;
    }