in samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java [250:286]
private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) {
for (Entry<String, String> entry : config.entrySet()) {
boolean found = false;
String key = entry.getKey();
String value = entry.getValue();
if (StringUtils.isEmpty(value)) {
continue;
}
for (Method method : KinesisClientLibConfiguration.class.getMethods()) {
if (method.getName().equals("with" + key)) {
found = true;
Class<?> type = method.getParameterTypes()[0];
try {
if (type == long.class) {
method.invoke(kinesisLibConfig, Long.valueOf(value));
} else if (type == int.class) {
method.invoke(kinesisLibConfig, Integer.valueOf(value));
} else if (type == boolean.class) {
method.invoke(kinesisLibConfig, Boolean.valueOf(value));
} else if (type == String.class) {
method.invoke(kinesisLibConfig, value);
} else if (type == InitialPositionInStream.class) {
method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase()));
}
LOG.info("Loaded property " + key + " = " + value);
break;
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Error trying to set field %s with the value '%s'", key, value), e);
}
}
}
if (!found) {
LOG.warn("Property " + key + " ignored as there is no corresponding set method");
}
}
}