public static void validateEfoConfiguration()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java [370:418]


    public static void validateEfoConfiguration(Properties config, List<String> streams) {
        EFORegistrationType efoRegistrationType;
        if (config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
            String typeInString = config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
            // specified efo registration type in stream must be either LAZY, EAGER or NONE.
            try {
                efoRegistrationType = EFORegistrationType.valueOf(typeInString);
            } catch (IllegalArgumentException e) {
                String errorMessage =
                        Arrays.stream(EFORegistrationType.values())
                                .map(Enum::name)
                                .collect(Collectors.joining(", "));
                throw new IllegalArgumentException(
                        "Invalid efo registration type in stream set in config. Valid values are: "
                                + errorMessage);
            }
        } else {
            efoRegistrationType = EFORegistrationType.LAZY;
        }
        if (efoRegistrationType == EFORegistrationType.NONE) {
            // if the registration type is NONE, then for each stream there must be an according
            // consumer ARN
            List<String> missingConsumerArnKeys = new ArrayList<>();
            for (String stream : streams) {
                String efoConsumerARNKey =
                        ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
                if (!config.containsKey(efoConsumerARNKey)) {
                    missingConsumerArnKeys.add(efoConsumerARNKey);
                }
            }
            if (!missingConsumerArnKeys.isEmpty()) {
                String errorMessage =
                        Arrays.stream(missingConsumerArnKeys.toArray())
                                .map(Object::toString)
                                .collect(Collectors.joining(", "));
                throw new IllegalArgumentException(
                        "Invalid efo consumer arn settings for not providing consumer arns: "
                                + errorMessage);
            }
        } else {
            // if the registration type is LAZY or EAGER, then user must provide a self-defined
            // consumer name.
            if (!config.containsKey(ConsumerConfigConstants.EFO_CONSUMER_NAME)) {
                throw new IllegalArgumentException(
                        "No valid enhanced fan-out consumer name is set through "
                                + ConsumerConfigConstants.EFO_CONSUMER_NAME);
            }
        }
    }