in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/util/KinesisConfigUtil.java [290:327]
public static void validateEfoConfiguration(Properties config, List<String> streams) {
EFORegistrationType efoRegistrationType;
if (config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
String typeInString = config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
// specified efo registration type in stream must be either LAZY, EAGER or NONE.
try {
efoRegistrationType = EFORegistrationType.valueOf(typeInString);
} catch (IllegalArgumentException e) {
String errorMessage = Arrays.stream(EFORegistrationType.values())
.map(Enum::name).collect(Collectors.joining(", "));
throw new IllegalArgumentException("Invalid efo registration type in stream set in config. Valid values are: " + errorMessage);
}
} else {
efoRegistrationType = EFORegistrationType.LAZY;
}
if (efoRegistrationType == EFORegistrationType.NONE) {
//if the registration type is NONE, then for each stream there must be an according consumer ARN
List<String> missingConsumerArnKeys = new ArrayList<>();
for (String stream : streams) {
String efoConsumerARNKey = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
if (!config.containsKey(efoConsumerARNKey)) {
missingConsumerArnKeys.add(efoConsumerARNKey);
}
}
if (!missingConsumerArnKeys.isEmpty()) {
String errorMessage = Arrays
.stream(missingConsumerArnKeys.toArray())
.map(Object::toString)
.collect(Collectors.joining(", "));
throw new IllegalArgumentException("Invalid efo consumer arn settings for not providing consumer arns: " + errorMessage);
}
} else {
//if the registration type is LAZY or EAGER, then user must provide a self-defined consumer name.
if (!config.containsKey(ConsumerConfigConstants.EFO_CONSUMER_NAME)) {
throw new IllegalArgumentException("No valid enhanced fan-out consumer name is set through " + ConsumerConfigConstants.EFO_CONSUMER_NAME);
}
}
}