in pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java [86:139]
private void startThread(TwitterFireHoseConfig config) {
BasicClient client = new ClientBuilder()
.name(config.getClientName())
.hosts(config.getClientHosts())
.endpoint(getEndpoint(config))
.authentication(getAuthentication(config))
.processor(new HosebirdMessageProcessor() {
public DelimitedStreamReader reader;
@Override
public void setup(InputStream input) {
reader = new DelimitedStreamReader(input, Constants.DEFAULT_CHARSET,
config.getClientBufferSize());
}
@Override
public boolean process() throws IOException, InterruptedException {
String tweetStr = reader.readLine();
try {
TweetData tweet = mapper.readValue(tweetStr, TweetData.class);
// We don't really care if the record succeeds or not.
// However might be in the future to count failures
// TODO:- Figure out the metrics story for connectors
consume(new TwitterRecord(tweet, config.getGuestimateTweetTime()));
} catch (Exception e) {
LOG.error("Exception thrown", e);
}
return true;
}
})
.build();
Thread runnerThread = new Thread(() -> {
LOG.info("Started the Twitter FireHose Runner Thread");
client.connect();
LOG.info("Twitter Streaming API connection established successfully");
// just wait now
try {
synchronized (waitObject) {
waitObject.wait();
}
} catch (Exception e) {
LOG.info("Got a exception in waitObject");
}
LOG.debug("Closing Twitter Streaming API connection");
client.stop();
LOG.info("Twitter Streaming API connection closed");
LOG.info("Twitter FireHose Runner Thread ending");
});
runnerThread.setName("TwitterFireHoseRunner");
runnerThread.start();
}