in local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java [69:83]
public void run() {
TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter());
ActivityConverterProcessor converter = new ActivityConverterProcessor();
ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch());
LocalRuntimeConfiguration localRuntimeConfiguration =
StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
builder.newPerpetualStream(TwitterTimelineProvider.class.getCanonicalName(), provider);
builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), converter, 2, TwitterTimelineProvider.class.getCanonicalName());
builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), writer, 1, ActivityConverterProcessor.class.getCanonicalName());
builder.start();
}