public void prepare()

in streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java [230:314]


  public void prepare(Object configurationObject) {

    Objects.requireNonNull(config.getEndpoint());

    Hosts hosebirdHosts;
    if (config.getEndpoint().equals("userstream") ) {

      hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);

      UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
      userstreamEndpoint.withFollowings(true);
      userstreamEndpoint.withUser(false);
      userstreamEndpoint.allReplies(false);
      endpoint = userstreamEndpoint;

    } else if (config.getEndpoint().equals("sample") ) {

      hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);

      boolean track = config.getTrack() != null && !config.getTrack().isEmpty();
      boolean follow = config.getFollow() != null && !config.getFollow().isEmpty();

      if ( track || follow ) {
        LOGGER.debug("***\tPRESENT\t***");
        StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
        if ( track ) {
          statusesFilterEndpoint.trackTerms(config.getTrack());
        }
        if ( follow ) {
          statusesFilterEndpoint.followings(config.getFollow());
        }
        this.endpoint = statusesFilterEndpoint;
      } else {
        endpoint = new StatusesSampleEndpoint();
      }

    } else if ( config.getEndpoint().endsWith("firehose")) {
      hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
      endpoint = new StatusesFirehoseEndpoint();
    } else {
      LOGGER.error("NO ENDPOINT RESOLVED");
      return;
    }

    if ( config.getBasicauth() != null ) {

      Objects.requireNonNull(config.getBasicauth().getUsername());
      Objects.requireNonNull(config.getBasicauth().getPassword());

      auth = new BasicAuth(
          config.getBasicauth().getUsername(),
          config.getBasicauth().getPassword()
      );

    } else if ( config.getOauth() != null ) {

      Objects.requireNonNull(config.getOauth().getConsumerKey());
      Objects.requireNonNull(config.getOauth().getConsumerSecret());
      Objects.requireNonNull(config.getOauth().getAccessToken());
      Objects.requireNonNull(config.getOauth().getAccessTokenSecret());

      auth = new OAuth1(config.getOauth().getConsumerKey(),
          config.getOauth().getConsumerSecret(),
          config.getOauth().getAccessToken(),
          config.getOauth().getAccessTokenSecret());

    } else {
      LOGGER.error("NO AUTH RESOLVED");
      return;
    }

    LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, auth);

    providerQueue = new LinkedBlockingQueue<>(MAX_BATCH);

    client = new ClientBuilder()
        .name("apache/streams/streams-contrib/streams-provider-twitter")
        .hosts(hosebirdHosts)
        .endpoint(endpoint)
        .authentication(auth)
        .connectionTimeout(1200000)
        .processor(processor)
        .build();

  }