in local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java [64:92]
public void run() {
TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter();
TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration);
TypeConverterProcessor converter = new TypeConverterProcessor(String.class);
List<DocumentClassifier> classifiers = Stream.of((DocumentClassifier) new TwitterDocumentClassifier()).collect(Collectors.toList());
List<ActivityConverter> converters = Stream.of((ActivityConverter) new TwitterFollowActivityConverter()).collect(Collectors.toList());
ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration =
new ActivityConverterProcessorConfiguration()
.withClassifiers(classifiers)
.withConverters(converters);
ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration);
Neo4jConfiguration neo4jConfiguration = config.getNeo4j();
Neo4jBoltPersistWriter graphPersistWriter = new Neo4jBoltPersistWriter(neo4jConfiguration);
graphPersistWriter.prepare(neo4jConfiguration);
LocalRuntimeConfiguration localRuntimeConfiguration =
StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
builder.newPerpetualStream(TwitterFollowingProvider.class.getCanonicalName(), followingProvider);
builder.addStreamsProcessor(TypeConverterProcessor.class.getCanonicalName(), converter, 1, TwitterFollowingProvider.class.getCanonicalName());
builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), activity, 1, TypeConverterProcessor.class.getCanonicalName());
builder.addStreamsPersistWriter(Neo4jBoltPersistWriter.class.getCanonicalName(), graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName());
builder.start();
}