in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java [43:84]
public static Optional<RecoverableErrorsConfig> createConfigFromPropertiesOrThrow(
final Properties config) {
List<ExceptionConfig> exConfs = new ArrayList<>();
int idx = 0;
String exceptionConfigKey =
String.format(
"%s[%d].exception",
ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, idx);
while (config.containsKey(exceptionConfigKey)) {
String exPath = config.getProperty(exceptionConfigKey);
try {
Class<?> aClass = Class.forName(exPath);
if (!Throwable.class.isAssignableFrom(aClass)) {
throw new ClassCastException();
}
exConfs.add(new ExceptionConfig(aClass));
} catch (ClassCastException e) {
throw new IllegalArgumentException(
"Provided recoverable exception class is not a Throwable: " + exPath);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
"Provided recoverable exception class could not be found: " + exPath);
}
exceptionConfigKey =
String.format(
"%s[%d].exception",
ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, ++idx);
}
if (idx > 0) {
// We processed configs successfully
return Optional.of(new RecoverableErrorsConfig(exConfs));
}
// Check if user provided wrong config suffix, so they fail faster
for (Object key : config.keySet()) {
if (((String) key).startsWith(ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX)) {
throw new IllegalArgumentException(RecoverableErrorsConfig.INVALID_CONFIG_MESSAGE);
}
}
return Optional.empty();
}