public static Optional createConfigFromPropertiesOrThrow()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java [43:84]


    public static Optional<RecoverableErrorsConfig> createConfigFromPropertiesOrThrow(
            final Properties config) {
        List<ExceptionConfig> exConfs = new ArrayList<>();
        int idx = 0;
        String exceptionConfigKey =
                String.format(
                        "%s[%d].exception",
                        ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, idx);
        while (config.containsKey(exceptionConfigKey)) {
            String exPath = config.getProperty(exceptionConfigKey);
            try {
                Class<?> aClass = Class.forName(exPath);
                if (!Throwable.class.isAssignableFrom(aClass)) {
                    throw new ClassCastException();
                }
                exConfs.add(new ExceptionConfig(aClass));
            } catch (ClassCastException e) {
                throw new IllegalArgumentException(
                        "Provided recoverable exception class is not a Throwable: " + exPath);
            } catch (ClassNotFoundException e) {
                throw new IllegalArgumentException(
                        "Provided recoverable exception class could not be found: " + exPath);
            }
            exceptionConfigKey =
                    String.format(
                            "%s[%d].exception",
                            ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, ++idx);
        }
        if (idx > 0) {
            // We processed configs successfully
            return Optional.of(new RecoverableErrorsConfig(exConfs));
        }

        // Check if user provided wrong config suffix, so they fail faster
        for (Object key : config.keySet()) {
            if (((String) key).startsWith(ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX)) {
                throw new IllegalArgumentException(RecoverableErrorsConfig.INVALID_CONFIG_MESSAGE);
            }
        }

        return Optional.empty();
    }