public static void validateEfoConfiguration()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/util/KinesisConfigUtil.java [290:327]


	public static void validateEfoConfiguration(Properties config, List<String> streams) {
		EFORegistrationType efoRegistrationType;
		if (config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
			String typeInString = config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
			// specified efo registration type in stream must be either LAZY, EAGER or NONE.
			try {
				efoRegistrationType = EFORegistrationType.valueOf(typeInString);
			} catch (IllegalArgumentException e) {
				String errorMessage = Arrays.stream(EFORegistrationType.values())
					.map(Enum::name).collect(Collectors.joining(", "));
				throw new IllegalArgumentException("Invalid efo registration type in stream set in config. Valid values are: " + errorMessage);
			}
		} else {
			efoRegistrationType = EFORegistrationType.LAZY;
		}
		if (efoRegistrationType == EFORegistrationType.NONE) {
			//if the registration type is NONE, then for each stream there must be an according consumer ARN
			List<String> missingConsumerArnKeys = new ArrayList<>();
			for (String stream : streams) {
				String efoConsumerARNKey = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
				if (!config.containsKey(efoConsumerARNKey)) {
					missingConsumerArnKeys.add(efoConsumerARNKey);
				}
			}
			if (!missingConsumerArnKeys.isEmpty()) {
				String errorMessage = Arrays
					.stream(missingConsumerArnKeys.toArray())
					.map(Object::toString)
					.collect(Collectors.joining(", "));
				throw new IllegalArgumentException("Invalid efo consumer arn settings for not providing consumer arns: " + errorMessage);
			}
		} else {
			//if the registration type is LAZY or EAGER, then user must provide a self-defined consumer name.
			if (!config.containsKey(ConsumerConfigConstants.EFO_CONSUMER_NAME)) {
				throw new IllegalArgumentException("No valid enhanced fan-out consumer name is set through " + ConsumerConfigConstants.EFO_CONSUMER_NAME);
			}
		}
	}