in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrar.java [66:109]
public void registerStreamConsumer() {
if (sourceConfig.get(READER_TYPE) != EFO) {
return;
}
String streamConsumerName = sourceConfig.get(EFO_CONSUMER_NAME);
Preconditions.checkNotNull(
streamConsumerName, "For EFO reader type, EFO consumer name must be specified.");
Preconditions.checkArgument(
!streamConsumerName.isEmpty(),
"For EFO reader type, EFO consumer name cannot be empty.");
switch (sourceConfig.get(EFO_CONSUMER_LIFECYCLE)) {
case JOB_MANAGED:
try {
LOG.info("Registering stream consumer - {}::{}", streamArn, streamConsumerName);
RegisterStreamConsumerResponse response =
kinesisStreamProxy.registerStreamConsumer(
streamArn, streamConsumerName);
consumerArn = response.consumer().consumerARN();
LOG.info(
"Registered stream consumer - {}::{}",
streamArn,
response.consumer().consumerARN());
} catch (ResourceInUseException e) {
LOG.warn(
"Found existing consumer {} on stream {}. Proceeding to read from consumer.",
streamConsumerName,
streamArn,
e);
}
break;
case SELF_MANAGED:
// This helps the job to fail fast if the EFO consumer requested does not exist.
DescribeStreamConsumerResponse response =
kinesisStreamProxy.describeStreamConsumer(streamArn, streamConsumerName);
LOG.info("Discovered stream consumer - {}", response);
break;
default:
throw new IllegalArgumentException(
"Unsupported EFO consumer lifecycle: "
+ sourceConfig.get(EFO_CONSUMER_LIFECYCLE));
}
}