in aws-blog-kinesis-storm-clickstream-app/src/main/java/KinesisStormClickstreamApp/SampleTopology.java [97:148]
private static void configure(String propertiesFile) throws IOException {
FileInputStream inputStream = new FileInputStream(propertiesFile);
Properties properties = new Properties();
try {
properties.load(inputStream);
} finally {
inputStream.close();
}
String topologyNameOverride = properties.getProperty(ConfigKeys.TOPOLOGY_NAME_KEY);
if (topologyNameOverride != null) {
topologyName = topologyNameOverride;
}
LOG.info("Using topology name " + topologyName);
String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY);
if (streamNameOverride != null) {
streamName = streamNameOverride;
}
LOG.info("Using stream name " + streamName);
String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY);
if (initialPositionOverride != null) {
initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride);
}
LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found).");
String zookeeperEndpointOverride = properties.getProperty(ConfigKeys.ZOOKEEPER_ENDPOINT_KEY);
if (zookeeperEndpointOverride != null) {
zookeeperEndpoint = zookeeperEndpointOverride;
}
LOG.info("Using zookeeper endpoint " + zookeeperEndpoint);
String zookeeperPrefixOverride = properties.getProperty(ConfigKeys.ZOOKEEPER_PREFIX_KEY);
if (zookeeperPrefixOverride != null) {
zookeeperPrefix = zookeeperPrefixOverride;
}
LOG.info("Using zookeeper prefix " + zookeeperPrefix);
String elasticCacheRedisEndpointOverride = properties.getProperty(ConfigKeys.REDIS_ENDPOINT_KEY);
if (elasticCacheRedisEndpointOverride != null) {
elasticCacheRedisEndpoint = elasticCacheRedisEndpointOverride;
}
LOG.info("Using zookeeper prefix " + elasticCacheRedisEndpoint);
String regionNameOverride = properties.getProperty(ConfigKeys.REGION_NAME);
if (regionNameOverride != null) {
regionName = regionNameOverride;
}
LOG.info("Using Region " + regionName);
}