in src/main/java/com/amazonaws/services/kinesis/aggregators/app/AggregatorsBeanstalkApp.java [51:151]
public void contextInitialized(ServletContextEvent contextEvent) {
String configPath = System.getProperty(AggregatorsConstants.CONFIG_URL_PARAM);
if (configPath != null && !configPath.equals("")) {
LOG.info("Starting Managed Beanstalk Aggregators Worker");
String streamNameParam = System.getProperty(AggregatorsConstants.STREAM_NAME_PARAM);
String appNameParam = System.getProperty(AggregatorsConstants.APP_NAME_PARAM);
String regionNameParam = System.getProperty(AggregatorsConstants.REGION_PARAM);
String streamPosParam = System.getProperty(AggregatorsConstants.STREAM_POSITION_PARAM);
String maxRecordsParam = System.getProperty(AggregatorsConstants.MAX_RECORDS_PARAM);
String environmentParam = System.getProperty(AggregatorsConstants.ENVIRONMENT_PARAM);
String failuresToleratedParam = System.getProperty(AggregatorsConstants.FAILURES_TOLERATED_PARAM);
if (streamNameParam == null || streamNameParam.equals("") || appNameParam == null
|| appNameParam.equals("")) {
LOG.error(String.format(
"Unable to run Beanstalk Managed Aggregator Consumer without Configuration of Parameters %s and %s. Application is Idle.",
AggregatorsConstants.STREAM_NAME_PARAM, AggregatorsConstants.APP_NAME_PARAM));
return;
}
InitialPositionInStream initialPosition = null;
if (streamPosParam != null) {
try {
initialPosition = InitialPositionInStream.valueOf(streamPosParam);
LOG.info(String.format("Starting from %s Position in Stream", streamPosParam));
} catch (Exception e) {
LOG.error(String.format("%s is an invalid Initial Position in Stream",
streamPosParam));
return;
}
}
try {
AggregatorConsumer consumer = new AggregatorConsumer(streamNameParam, appNameParam,
configPath);
// add consumer parameters, if set from System Properties
if (regionNameParam != null && !regionNameParam.equals("")) {
consumer.withRegionName(regionNameParam);
}
if (initialPosition != null) {
consumer.withInitialPositionInStream(initialPosition.name());
}
if (maxRecordsParam != null && !maxRecordsParam.equals("")) {
consumer.withMaxRecords(Integer.parseInt(maxRecordsParam));
}
if (environmentParam != null && !environmentParam.equals("")) {
consumer.withEnvironment(environmentParam);
}
if (failuresToleratedParam != null && !failuresToleratedParam.equals("")) {
consumer.withToleratedWorkerFailures(Integer.parseInt(failuresToleratedParam));
}
// configure the consumer so that the aggregators get
// instantiated
consumer.configure();
AggregatorGroup aggGroup = consumer.getAggregators();
// put the aggregator group reference and configureation
// references into the application context
contextEvent.getServletContext().setAttribute(AGGREGATOR_GROUP_PARAM, aggGroup);
contextEvent.getServletContext().setAttribute(
AggregatorsConstants.STREAM_NAME_PARAM, streamNameParam);
LOG.info("Registered Stream and Aggregator Group with Servlet Context");
// start the consumer
final class ConsumerRunner implements Runnable {
final AggregatorConsumer consumer;
public ConsumerRunner(AggregatorConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
consumer.run();
} catch (Exception e) {
e.printStackTrace();
LOG.error(e);
}
}
}
t = new Thread(new ConsumerRunner(consumer));
t.start();
} catch (Exception e) {
LOG.error(e);
}
} else {
LOG.warn(String.format(
"No Aggregators Configuration File found in Beanstalk Configuration %s. Application is Idle",
AggregatorsConstants.CONFIG_URL_PARAM));
}
}