public void connect()

in src/main/java/com/uber/rss/clients/MultiServerAsyncWriteClient.java [126:180]


    public void connect() {
        servers.parallelStream().forEach(t -> connectSingleClient(t));

        // use synchronize to make sure reads on clients array element getting latest value from other threads
        // see http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html
        synchronized (clients) {
            // sanity check that clients are initialized correctly
            for (int i = 0; i < clients.length; i++) {
                if (clients[i] == null) {
                    throw new RssInvalidStateException(String.format("Client %s is null", i));
                }
            }
        }

        for (int i = 0; i < threads.length; i++) {
            final int threadIndex = i;
            Thread thread = new Thread(() -> {
                logger.info(String.format("Record Thread %s started", threadIndex));
                BlockingQueue<Record> recordQueue = recordQueues[threadIndex];
                try {
                    // TODO optimize the max wait time for poll
                    long pollMaxWait = networkTimeoutMillis * 4;
                    while (exceptions.isEmpty()) {
                        long startTime = System.nanoTime();
                        // TODO optimize here to restart thread if there is new record?
                        Record record = recordQueue.poll(pollMaxWait, TimeUnit.MILLISECONDS);
                        queuePollTime.addAndGet(System.nanoTime() - startTime);
                        if (record != null) {
                            if (record.isStopMarker) {
                                break;
                            }
                            ReplicatedWriteClient writeClient = clients[record.clientIndex];
                            startTime = System.nanoTime();
                            writeClient.writeDataBlock(record.partition, record.value);
                            socketTime.addAndGet(System.nanoTime() - startTime);
                        } else {
                            logger.info("Record queue {} has no record after waiting {} millis", threadIndex, pollMaxWait);
                        }
                    }
                } catch (Throwable e) {
                    logger.warn(String.format("Record Thread %s got exception, %s", threadIndex, ExceptionUtils.getSimpleMessage(e)), e);
                    M3Stats.addException(e, this.getClass().getSimpleName());
                    exceptions.add(e);
                }
                int remainingRecords = recordQueue.size();
                if (remainingRecords > 0) {
                    exceptions.add(new RssQueueNotReadyException(String.format("Record queue %s has %s remaining records not sent out", threadIndex, remainingRecords)));
                }
                recordQueue.clear();
                logger.info(String.format("Record Thread %s finished, remaining records: %s", threadIndex, remainingRecords));
            });
            thread.setName("Record Thread " + i);
            threads[threadIndex] = thread;
        }
    }