public void run()

in local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java [64:92]


  public void run() {

    TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter();
    TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration);
    TypeConverterProcessor converter = new TypeConverterProcessor(String.class);

    List<DocumentClassifier> classifiers = Stream.of((DocumentClassifier) new TwitterDocumentClassifier()).collect(Collectors.toList());
    List<ActivityConverter> converters = Stream.of((ActivityConverter) new TwitterFollowActivityConverter()).collect(Collectors.toList());
    ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration =
        new ActivityConverterProcessorConfiguration()
            .withClassifiers(classifiers)
            .withConverters(converters);
    ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration);

    Neo4jConfiguration neo4jConfiguration = config.getNeo4j();
    Neo4jBoltPersistWriter graphPersistWriter = new Neo4jBoltPersistWriter(neo4jConfiguration);
    graphPersistWriter.prepare(neo4jConfiguration);

    LocalRuntimeConfiguration localRuntimeConfiguration =
        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);

    builder.newPerpetualStream(TwitterFollowingProvider.class.getCanonicalName(), followingProvider);
    builder.addStreamsProcessor(TypeConverterProcessor.class.getCanonicalName(), converter, 1, TwitterFollowingProvider.class.getCanonicalName());
    builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), activity, 1, TypeConverterProcessor.class.getCanonicalName());
    builder.addStreamsPersistWriter(Neo4jBoltPersistWriter.class.getCanonicalName(), graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName());

    builder.start();
  }