in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/StreamConsumerRegistrar.java [81:117]
public String registerStreamConsumer(final String stream, final String streamConsumerName)
throws Exception {
LOG.debug("Registering stream consumer - {}::{}", stream, streamConsumerName);
int attempt = 1;
if (configuration.getEfoRegistrationType() == LAZY) {
registrationBackoff(configuration, backoff, attempt++);
}
DescribeStreamSummaryResponse describeStreamSummaryResponse =
kinesisProxyV2Interface.describeStreamSummary(stream);
String streamArn = describeStreamSummaryResponse.streamDescriptionSummary().streamARN();
LOG.debug("Found stream ARN - {}", streamArn);
Optional<DescribeStreamConsumerResponse> describeStreamConsumerResponse =
describeStreamConsumer(streamArn, streamConsumerName);
if (!describeStreamConsumerResponse.isPresent()) {
invokeIgnoringResourceInUse(
() ->
kinesisProxyV2Interface.registerStreamConsumer(
streamArn, streamConsumerName));
}
String streamConsumerArn =
waitForConsumerToBecomeActive(
describeStreamConsumerResponse.orElse(null),
streamArn,
streamConsumerName,
attempt);
LOG.debug("Using stream consumer - {}", streamConsumerArn);
return streamConsumerArn;
}