public void contextInitialized()

in src/main/java/com/amazonaws/services/kinesis/aggregators/app/AggregatorsBeanstalkApp.java [51:151]


    public void contextInitialized(ServletContextEvent contextEvent) {
        String configPath = System.getProperty(AggregatorsConstants.CONFIG_URL_PARAM);

        if (configPath != null && !configPath.equals("")) {
            LOG.info("Starting Managed Beanstalk Aggregators Worker");
            String streamNameParam = System.getProperty(AggregatorsConstants.STREAM_NAME_PARAM);
            String appNameParam = System.getProperty(AggregatorsConstants.APP_NAME_PARAM);
            String regionNameParam = System.getProperty(AggregatorsConstants.REGION_PARAM);
            String streamPosParam = System.getProperty(AggregatorsConstants.STREAM_POSITION_PARAM);
            String maxRecordsParam = System.getProperty(AggregatorsConstants.MAX_RECORDS_PARAM);
            String environmentParam = System.getProperty(AggregatorsConstants.ENVIRONMENT_PARAM);
            String failuresToleratedParam = System.getProperty(AggregatorsConstants.FAILURES_TOLERATED_PARAM);

            if (streamNameParam == null || streamNameParam.equals("") || appNameParam == null
                    || appNameParam.equals("")) {
                LOG.error(String.format(
                        "Unable to run Beanstalk Managed Aggregator Consumer without Configuration of Parameters %s and %s. Application is Idle.",
                        AggregatorsConstants.STREAM_NAME_PARAM, AggregatorsConstants.APP_NAME_PARAM));
                return;
            }

            InitialPositionInStream initialPosition = null;
            if (streamPosParam != null) {
                try {
                    initialPosition = InitialPositionInStream.valueOf(streamPosParam);
                    LOG.info(String.format("Starting from %s Position in Stream", streamPosParam));
                } catch (Exception e) {
                    LOG.error(String.format("%s is an invalid Initial Position in Stream",
                            streamPosParam));
                    return;
                }
            }

            try {
                AggregatorConsumer consumer = new AggregatorConsumer(streamNameParam, appNameParam,
                        configPath);

                // add consumer parameters, if set from System Properties
                if (regionNameParam != null && !regionNameParam.equals("")) {
                    consumer.withRegionName(regionNameParam);
                }

                if (initialPosition != null) {
                    consumer.withInitialPositionInStream(initialPosition.name());
                }

                if (maxRecordsParam != null && !maxRecordsParam.equals("")) {
                    consumer.withMaxRecords(Integer.parseInt(maxRecordsParam));
                }

                if (environmentParam != null && !environmentParam.equals("")) {
                    consumer.withEnvironment(environmentParam);
                }

                if (failuresToleratedParam != null && !failuresToleratedParam.equals("")) {
                    consumer.withToleratedWorkerFailures(Integer.parseInt(failuresToleratedParam));
                }

                // configure the consumer so that the aggregators get
                // instantiated
                consumer.configure();

                AggregatorGroup aggGroup = consumer.getAggregators();

                // put the aggregator group reference and configureation
                // references into the application context
                contextEvent.getServletContext().setAttribute(AGGREGATOR_GROUP_PARAM, aggGroup);
                contextEvent.getServletContext().setAttribute(
                        AggregatorsConstants.STREAM_NAME_PARAM, streamNameParam);

                LOG.info("Registered Stream and Aggregator Group with Servlet Context");

                // start the consumer
                final class ConsumerRunner implements Runnable {
                    final AggregatorConsumer consumer;

                    public ConsumerRunner(AggregatorConsumer consumer) {
                        this.consumer = consumer;
                    }

                    @Override
                    public void run() {
                        try {
                            consumer.run();
                        } catch (Exception e) {
                            e.printStackTrace();
                            LOG.error(e);
                        }
                    }
                }
                t = new Thread(new ConsumerRunner(consumer));
                t.start();
            } catch (Exception e) {
                LOG.error(e);
            }
        } else {
            LOG.warn(String.format(
                    "No Aggregators Configuration File found in Beanstalk Configuration %s. Application is Idle",
                    AggregatorsConstants.CONFIG_URL_PARAM));
        }
    }