public static AggregatorGroup buildFromConfig()

in src/main/java/com/amazonaws/services/kinesis/aggregators/factory/ExternallyConfiguredAggregatorFactory.java [48:151]


    public static AggregatorGroup buildFromConfig(String streamName, String applicationName,
            KinesisClientLibConfiguration config, String configFile) throws Exception {
        List<ExternalConfigurationModel> models = ExternalConfigurationModel.buildFromConfig(configFile);

        if (models.size() == 0) {
            throw new InvalidConfigurationException(String.format(
                    "Unable to build any Aggregators from External Configuration %s", configFile));
        }

        AggregatorGroup aggregators = new AggregatorGroup();
        StreamAggregator agg = null;
        IDataExtractor dataExtractor = null;

        // the configuration may have included many configuration models
        for (ExternalConfigurationModel model : models) {
            switch (model.getDataExtractor()) {
                case CSV:
                    CsvDataExtractor d = new CsvDataExtractor(intList(model.getLabelItems())).withDateValueIndex(
                            Integer.parseInt(model.getDateItem())).withDelimiter(
                            model.getDelimiter()).withItemTerminator(model.getItemTerminator()).withRegexFilter(
                            model.getFilterRegex()).withDateFormat(model.getDateFormat()).withStringSummaryIndicies(
                            model.getSummaryItems());

                    if (model.getLabelAttributeAlias() != null) {
                        d.withLabelAttributeAlias(model.getLabelAttributeAlias());
                    }
                    if (model.getDateAttributeAlias() != null) {
                        d.withDateAttributeAlias(model.getDateAttributeAlias());
                    }

                    dataExtractor = d;
                    break;
                case REGEX:
                    RegexDataExtractor e = new RegexDataExtractor(model.getRegularExpression(),
                            intList(model.getLabelItems())).withItemTerminator(
                            model.getItemTerminator()).withDateValueIndex(
                            Integer.parseInt(model.getDateItem())).withDateFormat(
                            model.getDateFormat()).withStringSummaryIndicies(
                            model.getSummaryItems());
                    if (model.getLabelAttributeAlias() != null) {
                        e.withLabelAttributeAlias(model.getLabelAttributeAlias());
                    }
                    if (model.getDateAttributeAlias() != null) {
                        e.withDateAttributeAlias(model.getDateAttributeAlias());
                    }

                    dataExtractor = e;
                    break;
                case JSON:
                    dataExtractor = new JsonDataExtractor(model.getLabelItems())
                            .withDateFormat(model.getDateFormat())
                            .withDateValueAttribute(model.getDateItem())
                            .withSummaryAttributes(model.getSummaryItems())
                            .withItemTerminator(model.getItemTerminator())
                            .withRegexFilter(model.getFilterRegex())
                    ;
                    break;
                case OBJECT:
                    ObjectExtractor extractor = null;
                    if (model.isAnnotatedClass()) {
                        extractor = new ObjectExtractor(model.getClazz());
                    } else {
                        extractor = new ObjectExtractor(model.getLabelItems(), model.getClazz());
                    }

                    extractor.withDateMethod(model.getDateItem()).withSummaryMethods(
                            model.getSummaryItems());
                    dataExtractor = extractor;
                    break;

            }

            dataExtractor.setAggregatorType(model.getAggregatorType());

            agg = new StreamAggregator(streamName, applicationName, model.getNamespace(), config,
                    dataExtractor).withAggregatorType(model.getAggregatorType()).withStorageCapacity(
                    model.getReadIOPs(), model.getWriteIOPs()).withTableName(model.getTableName()).withTimeHorizon(
                    model.getTimeHorizons()).withRaiseExceptionOnDataExtractionErrors(
                    model.shouldFailOnDataExtraction());

            // configure metrics service on the aggregator if it's been
            // configured
            if (model.shouldEmitMetrics() || model.getMetricsEmitter() != null) {
                if (model.getMetricsEmitter() != null) {
                    agg.withMetricsEmitter(model.getMetricsEmitter().newInstance());
                } else {
                    agg.withCloudWatchMetrics();
                }
            }

            // create a new instance of the Data Store if one has been
            // configured. Currently we only support pluggable data stores that
            // are configured via their environment or have self defined
            // configuration models: only no args public constructors can be
            // called
            if (model.getDataStore() != null) {
                agg.withDataStore((IDataStore) model.getDataStore().newInstance());
            }

            aggregators.registerAggregator(agg);
        }

        return aggregators;
    }