in src/main/java/com/amazonaws/services/kinesis/aggregators/configuration/ExternalConfigurationModel.java [192:358]
public static List<ExternalConfigurationModel> buildFromConfig(String configFilePath)
throws Exception {
List<ExternalConfigurationModel> response = new ArrayList<>();
// reference the config file as a full path
File configFile = new File(configFilePath);
if (!configFile.exists()) {
// try to load the file from the classpath
InputStream classpathConfig = ExternalConfigurationModel.class.getClassLoader().getResourceAsStream(
configFilePath);
if (classpathConfig != null && classpathConfig.available() > 0) {
configFile = new File(ExternalConfigurationModel.class.getResource(
(configFilePath.startsWith("/") ? "" : "/") + configFilePath).toURI());
LOG.info(String.format("Loaded Configuration %s from Classpath", configFilePath));
} else {
if (configFilePath.startsWith("s3://")) {
AmazonS3 s3Client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain());
TransferManager tm = new TransferManager(s3Client);
// parse the config path to get the bucket name and prefix
final String s3ProtoRegex = "s3:\\/\\/";
String bucket = configFilePath.replaceAll(s3ProtoRegex, "").split("/")[0];
String prefix = configFilePath.replaceAll(
String.format("%s%s\\/", s3ProtoRegex, bucket), "");
// download the file using TransferManager
configFile = File.createTempFile(configFilePath, null);
Download download = tm.download(bucket, prefix, configFile);
download.waitForCompletion();
// shut down the transfer manager
tm.shutdownNow();
LOG.info(String.format("Loaded Configuration from Amazon S3 %s/%s to %s",
bucket, prefix, configFile.getAbsolutePath()));
} else {
// load the file from external URL
try {
configFile = File.createTempFile(configFilePath, null);
FileUtils.copyURLToFile(new URL(configFilePath), configFile, 1000, 1000);
LOG.info(String.format("Loaded Configuration from %s to %s",
configFilePath, configFile.getAbsolutePath()));
} catch (IOException e) {
// handle the timeouts and so on with a generalised
// config
// file not found handler later
}
}
}
} else {
LOG.info(String.format("Loaded Configuration from Filesystem %s", configFilePath));
}
// if we haven't been able to load a config file, then bail
if (configFile == null || !configFile.exists()) {
throw new InvalidConfigurationException(String.format(
"Unable to Load Config File from %s", configFilePath));
}
JsonNode document = StreamAggregatorUtils.asJsonNode(configFile);
ExternalConfigurationModel config = null;
Iterator<JsonNode> i = document.elements();
while (i.hasNext()) {
config = new ExternalConfigurationModel();
JsonNode section = i.next();
// set generic properties
config.setNamespace(StreamAggregatorUtils.readValueAsString(section, "namespace"));
config.setDateFormat(StreamAggregatorUtils.readValueAsString(section, "dateFormat"));
addTimeHorizons(section, config);
setAggregatorType(section, config);
// set the label items
JsonNode labelItems = StreamAggregatorUtils.readJsonValue(section, "labelItems");
if (labelItems != null && labelItems.size() > 0) {
Iterator<JsonNode> iterator = labelItems.elements();
while (iterator.hasNext()) {
JsonNode n = iterator.next();
config.addLabelItems(n.asText());
}
}
config.setLabelAttributeAlias(StreamAggregatorUtils.readValueAsString(section,
"labelAttributeAlias"));
config.setDateItem(StreamAggregatorUtils.readValueAsString(section, "dateItem"));
config.setDateAttributeAlias(StreamAggregatorUtils.readValueAsString(section,
"dateAttributeAlias"));
JsonNode summaryItems = StreamAggregatorUtils.readJsonValue(section, "summaryItems");
if (summaryItems != null && summaryItems.size() > 0) {
Iterator<JsonNode> iterator = summaryItems.elements();
while (iterator.hasNext()) {
JsonNode n = iterator.next();
config.addSummaryItem(n.asText());
}
}
config.setTableName(StreamAggregatorUtils.readValueAsString(section, "tableName"));
String readIO = StreamAggregatorUtils.readValueAsString(section, "readIOPS");
if (readIO != null)
config.setReadIOPs(Long.parseLong(readIO));
String writeIO = StreamAggregatorUtils.readValueAsString(section, "writeIOPS");
if (writeIO != null)
config.setWriteIOPs(Long.parseLong(writeIO));
// configure tolerance of data extraction problems
String failOnDataExtraction = StreamAggregatorUtils.readValueAsString(section,
"failOnDataExtraction");
if (failOnDataExtraction != null)
config.setFailOnDataExtraction(Boolean.parseBoolean(failOnDataExtraction));
// configure whether metrics should be emitted
String emitMetrics = StreamAggregatorUtils.readValueAsString(section, "emitMetrics");
String metricsEmitterClassname = StreamAggregatorUtils.readValueAsString(section,
"metricsEmitterClass");
if (emitMetrics != null || metricsEmitterClassname != null) {
if (metricsEmitterClassname != null) {
config.setMetricsEmitter((Class<IMetricsEmitter>) ClassLoader.getSystemClassLoader().loadClass(
metricsEmitterClassname));
} else {
config.setEmitMetrics(Boolean.parseBoolean(emitMetrics));
}
}
// configure the data store class
String dataStoreClass = StreamAggregatorUtils.readValueAsString(section, "IDataStore");
if (dataStoreClass != null) {
Class<IDataStore> dataStore = (Class<IDataStore>) ClassLoader.getSystemClassLoader().loadClass(
dataStoreClass);
config.setDataStore(dataStore);
}
// get the data extractor configuration, so we know what other json
// elements to retrieve from the configuration document
String useExtractor = null;
try {
useExtractor = StreamAggregatorUtils.readValueAsString(section, "dataExtractor");
config.setDataExtractor(DataExtractor.valueOf(useExtractor));
} catch (Exception e) {
throw new Exception(String.format(
"Unable to configure aggregator with Data Extractor %s", useExtractor));
}
switch (config.getDataExtractor()) {
case CSV:
configureStringCommon(section, config);
configureCsv(section, config);
break;
case JSON:
configureStringCommon(section, config);
break;
case OBJECT:
configureObject(section, config);
break;
case REGEX:
configureRegex(section, config);
}
response.add(config);
}
return response;
}