public static List buildFromConfig()

in src/main/java/com/amazonaws/services/kinesis/aggregators/configuration/ExternalConfigurationModel.java [192:358]


    public static List<ExternalConfigurationModel> buildFromConfig(String configFilePath)
            throws Exception {
        List<ExternalConfigurationModel> response = new ArrayList<>();

        // reference the config file as a full path
        File configFile = new File(configFilePath);
        if (!configFile.exists()) {

            // try to load the file from the classpath
            InputStream classpathConfig = ExternalConfigurationModel.class.getClassLoader().getResourceAsStream(
                    configFilePath);
            if (classpathConfig != null && classpathConfig.available() > 0) {
                configFile = new File(ExternalConfigurationModel.class.getResource(
                        (configFilePath.startsWith("/") ? "" : "/") + configFilePath).toURI());

                LOG.info(String.format("Loaded Configuration %s from Classpath", configFilePath));
            } else {
                if (configFilePath.startsWith("s3://")) {
                    AmazonS3 s3Client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain());
                    TransferManager tm = new TransferManager(s3Client);

                    // parse the config path to get the bucket name and prefix
                    final String s3ProtoRegex = "s3:\\/\\/";
                    String bucket = configFilePath.replaceAll(s3ProtoRegex, "").split("/")[0];
                    String prefix = configFilePath.replaceAll(
                            String.format("%s%s\\/", s3ProtoRegex, bucket), "");

                    // download the file using TransferManager
                    configFile = File.createTempFile(configFilePath, null);
                    Download download = tm.download(bucket, prefix, configFile);
                    download.waitForCompletion();

                    // shut down the transfer manager
                    tm.shutdownNow();

                    LOG.info(String.format("Loaded Configuration from Amazon S3 %s/%s to %s",
                            bucket, prefix, configFile.getAbsolutePath()));
                } else {
                    // load the file from external URL
                    try {
                        configFile = File.createTempFile(configFilePath, null);
                        FileUtils.copyURLToFile(new URL(configFilePath), configFile, 1000, 1000);
                        LOG.info(String.format("Loaded Configuration from %s to %s",
                                configFilePath, configFile.getAbsolutePath()));
                    } catch (IOException e) {
                        // handle the timeouts and so on with a generalised
                        // config
                        // file not found handler later
                    }
                }
            }
        } else {
            LOG.info(String.format("Loaded Configuration from Filesystem %s", configFilePath));
        }

        // if we haven't been able to load a config file, then bail
        if (configFile == null || !configFile.exists()) {
            throw new InvalidConfigurationException(String.format(
                    "Unable to Load Config File from %s", configFilePath));
        }

        JsonNode document = StreamAggregatorUtils.asJsonNode(configFile);

        ExternalConfigurationModel config = null;

        Iterator<JsonNode> i = document.elements();
        while (i.hasNext()) {
            config = new ExternalConfigurationModel();

            JsonNode section = i.next();

            // set generic properties
            config.setNamespace(StreamAggregatorUtils.readValueAsString(section, "namespace"));
            config.setDateFormat(StreamAggregatorUtils.readValueAsString(section, "dateFormat"));
            addTimeHorizons(section, config);
            setAggregatorType(section, config);

            // set the label items
            JsonNode labelItems = StreamAggregatorUtils.readJsonValue(section, "labelItems");
            if (labelItems != null && labelItems.size() > 0) {
                Iterator<JsonNode> iterator = labelItems.elements();
                while (iterator.hasNext()) {
                    JsonNode n = iterator.next();
                    config.addLabelItems(n.asText());
                }
            }
            config.setLabelAttributeAlias(StreamAggregatorUtils.readValueAsString(section,
                    "labelAttributeAlias"));

            config.setDateItem(StreamAggregatorUtils.readValueAsString(section, "dateItem"));
            config.setDateAttributeAlias(StreamAggregatorUtils.readValueAsString(section,
                    "dateAttributeAlias"));
            JsonNode summaryItems = StreamAggregatorUtils.readJsonValue(section, "summaryItems");
            if (summaryItems != null && summaryItems.size() > 0) {
                Iterator<JsonNode> iterator = summaryItems.elements();
                while (iterator.hasNext()) {
                    JsonNode n = iterator.next();
                    config.addSummaryItem(n.asText());
                }
            }

            config.setTableName(StreamAggregatorUtils.readValueAsString(section, "tableName"));

            String readIO = StreamAggregatorUtils.readValueAsString(section, "readIOPS");
            if (readIO != null)
                config.setReadIOPs(Long.parseLong(readIO));
            String writeIO = StreamAggregatorUtils.readValueAsString(section, "writeIOPS");
            if (writeIO != null)
                config.setWriteIOPs(Long.parseLong(writeIO));

            // configure tolerance of data extraction problems
            String failOnDataExtraction = StreamAggregatorUtils.readValueAsString(section,
                    "failOnDataExtraction");
            if (failOnDataExtraction != null)
                config.setFailOnDataExtraction(Boolean.parseBoolean(failOnDataExtraction));

            // configure whether metrics should be emitted
            String emitMetrics = StreamAggregatorUtils.readValueAsString(section, "emitMetrics");
            String metricsEmitterClassname = StreamAggregatorUtils.readValueAsString(section,
                    "metricsEmitterClass");
            if (emitMetrics != null || metricsEmitterClassname != null) {
                if (metricsEmitterClassname != null) {
                    config.setMetricsEmitter((Class<IMetricsEmitter>) ClassLoader.getSystemClassLoader().loadClass(
                            metricsEmitterClassname));
                } else {
                    config.setEmitMetrics(Boolean.parseBoolean(emitMetrics));
                }
            }

            // configure the data store class
            String dataStoreClass = StreamAggregatorUtils.readValueAsString(section, "IDataStore");
            if (dataStoreClass != null) {
                Class<IDataStore> dataStore = (Class<IDataStore>) ClassLoader.getSystemClassLoader().loadClass(
                        dataStoreClass);
                config.setDataStore(dataStore);
            }

            // get the data extractor configuration, so we know what other json
            // elements to retrieve from the configuration document
            String useExtractor = null;
            try {
                useExtractor = StreamAggregatorUtils.readValueAsString(section, "dataExtractor");
                config.setDataExtractor(DataExtractor.valueOf(useExtractor));
            } catch (Exception e) {
                throw new Exception(String.format(
                        "Unable to configure aggregator with Data Extractor %s", useExtractor));
            }

            switch (config.getDataExtractor()) {
                case CSV:
                    configureStringCommon(section, config);
                    configureCsv(section, config);
                    break;
                case JSON:
                    configureStringCommon(section, config);
                    break;
                case OBJECT:
                    configureObject(section, config);
                    break;
                case REGEX:
                    configureRegex(section, config);
            }

            response.add(config);
        }
        return response;
    }