in streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java [230:314]
public void prepare(Object configurationObject) {
Objects.requireNonNull(config.getEndpoint());
Hosts hosebirdHosts;
if (config.getEndpoint().equals("userstream") ) {
hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
userstreamEndpoint.withFollowings(true);
userstreamEndpoint.withUser(false);
userstreamEndpoint.allReplies(false);
endpoint = userstreamEndpoint;
} else if (config.getEndpoint().equals("sample") ) {
hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
boolean track = config.getTrack() != null && !config.getTrack().isEmpty();
boolean follow = config.getFollow() != null && !config.getFollow().isEmpty();
if ( track || follow ) {
LOGGER.debug("***\tPRESENT\t***");
StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
if ( track ) {
statusesFilterEndpoint.trackTerms(config.getTrack());
}
if ( follow ) {
statusesFilterEndpoint.followings(config.getFollow());
}
this.endpoint = statusesFilterEndpoint;
} else {
endpoint = new StatusesSampleEndpoint();
}
} else if ( config.getEndpoint().endsWith("firehose")) {
hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
endpoint = new StatusesFirehoseEndpoint();
} else {
LOGGER.error("NO ENDPOINT RESOLVED");
return;
}
if ( config.getBasicauth() != null ) {
Objects.requireNonNull(config.getBasicauth().getUsername());
Objects.requireNonNull(config.getBasicauth().getPassword());
auth = new BasicAuth(
config.getBasicauth().getUsername(),
config.getBasicauth().getPassword()
);
} else if ( config.getOauth() != null ) {
Objects.requireNonNull(config.getOauth().getConsumerKey());
Objects.requireNonNull(config.getOauth().getConsumerSecret());
Objects.requireNonNull(config.getOauth().getAccessToken());
Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
auth = new OAuth1(config.getOauth().getConsumerKey(),
config.getOauth().getConsumerSecret(),
config.getOauth().getAccessToken(),
config.getOauth().getAccessTokenSecret());
} else {
LOGGER.error("NO AUTH RESOLVED");
return;
}
LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, auth);
providerQueue = new LinkedBlockingQueue<>(MAX_BATCH);
client = new ClientBuilder()
.name("apache/streams/streams-contrib/streams-provider-twitter")
.hosts(hosebirdHosts)
.endpoint(endpoint)
.authentication(auth)
.connectionTimeout(1200000)
.processor(processor)
.build();
}