private void startThread()

in pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java [86:139]


    private void startThread(TwitterFireHoseConfig config) {

        BasicClient client = new ClientBuilder()
                .name(config.getClientName())
                .hosts(config.getClientHosts())
                .endpoint(getEndpoint(config))
                .authentication(getAuthentication(config))
                .processor(new HosebirdMessageProcessor() {
                    public DelimitedStreamReader reader;

                    @Override
                    public void setup(InputStream input) {
                        reader = new DelimitedStreamReader(input, Constants.DEFAULT_CHARSET,
                            config.getClientBufferSize());
                    }

                    @Override
                    public boolean process() throws IOException, InterruptedException {
                        String tweetStr = reader.readLine();
                        try {
                            TweetData tweet = mapper.readValue(tweetStr, TweetData.class);
                            // We don't really care if the record succeeds or not.
                            // However might be in the future to count failures
                            // TODO:- Figure out the metrics story for connectors
                            consume(new TwitterRecord(tweet, config.getGuestimateTweetTime()));
                        } catch (Exception e) {
                            LOG.error("Exception thrown", e);
                        }
                        return true;
                    }
                })
                .build();

        Thread runnerThread = new Thread(() -> {
            LOG.info("Started the Twitter FireHose Runner Thread");
            client.connect();
            LOG.info("Twitter Streaming API connection established successfully");

            // just wait now
            try {
                synchronized (waitObject) {
                    waitObject.wait();
                }
            } catch (Exception e) {
                LOG.info("Got a exception in waitObject");
            }
            LOG.debug("Closing Twitter Streaming API connection");
            client.stop();
            LOG.info("Twitter Streaming API connection closed");
            LOG.info("Twitter FireHose Runner Thread ending");
        });
        runnerThread.setName("TwitterFireHoseRunner");
        runnerThread.start();
    }