in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java [370:418]
public static void validateEfoConfiguration(Properties config, List<String> streams) {
EFORegistrationType efoRegistrationType;
if (config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
String typeInString = config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
// specified efo registration type in stream must be either LAZY, EAGER or NONE.
try {
efoRegistrationType = EFORegistrationType.valueOf(typeInString);
} catch (IllegalArgumentException e) {
String errorMessage =
Arrays.stream(EFORegistrationType.values())
.map(Enum::name)
.collect(Collectors.joining(", "));
throw new IllegalArgumentException(
"Invalid efo registration type in stream set in config. Valid values are: "
+ errorMessage);
}
} else {
efoRegistrationType = EFORegistrationType.LAZY;
}
if (efoRegistrationType == EFORegistrationType.NONE) {
// if the registration type is NONE, then for each stream there must be an according
// consumer ARN
List<String> missingConsumerArnKeys = new ArrayList<>();
for (String stream : streams) {
String efoConsumerARNKey =
ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
if (!config.containsKey(efoConsumerARNKey)) {
missingConsumerArnKeys.add(efoConsumerARNKey);
}
}
if (!missingConsumerArnKeys.isEmpty()) {
String errorMessage =
Arrays.stream(missingConsumerArnKeys.toArray())
.map(Object::toString)
.collect(Collectors.joining(", "));
throw new IllegalArgumentException(
"Invalid efo consumer arn settings for not providing consumer arns: "
+ errorMessage);
}
} else {
// if the registration type is LAZY or EAGER, then user must provide a self-defined
// consumer name.
if (!config.containsKey(ConsumerConfigConstants.EFO_CONSUMER_NAME)) {
throw new IllegalArgumentException(
"No valid enhanced fan-out consumer name is set through "
+ ConsumerConfigConstants.EFO_CONSUMER_NAME);
}
}
}