in src/main/java/com/uber/rss/clients/MultiServerAsyncWriteClient.java [126:180]
public void connect() {
servers.parallelStream().forEach(t -> connectSingleClient(t));
// use synchronize to make sure reads on clients array element getting latest value from other threads
// see http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html
synchronized (clients) {
// sanity check that clients are initialized correctly
for (int i = 0; i < clients.length; i++) {
if (clients[i] == null) {
throw new RssInvalidStateException(String.format("Client %s is null", i));
}
}
}
for (int i = 0; i < threads.length; i++) {
final int threadIndex = i;
Thread thread = new Thread(() -> {
logger.info(String.format("Record Thread %s started", threadIndex));
BlockingQueue<Record> recordQueue = recordQueues[threadIndex];
try {
// TODO optimize the max wait time for poll
long pollMaxWait = networkTimeoutMillis * 4;
while (exceptions.isEmpty()) {
long startTime = System.nanoTime();
// TODO optimize here to restart thread if there is new record?
Record record = recordQueue.poll(pollMaxWait, TimeUnit.MILLISECONDS);
queuePollTime.addAndGet(System.nanoTime() - startTime);
if (record != null) {
if (record.isStopMarker) {
break;
}
ReplicatedWriteClient writeClient = clients[record.clientIndex];
startTime = System.nanoTime();
writeClient.writeDataBlock(record.partition, record.value);
socketTime.addAndGet(System.nanoTime() - startTime);
} else {
logger.info("Record queue {} has no record after waiting {} millis", threadIndex, pollMaxWait);
}
}
} catch (Throwable e) {
logger.warn(String.format("Record Thread %s got exception, %s", threadIndex, ExceptionUtils.getSimpleMessage(e)), e);
M3Stats.addException(e, this.getClass().getSimpleName());
exceptions.add(e);
}
int remainingRecords = recordQueue.size();
if (remainingRecords > 0) {
exceptions.add(new RssQueueNotReadyException(String.format("Record queue %s has %s remaining records not sent out", threadIndex, remainingRecords)));
}
recordQueue.clear();
logger.info(String.format("Record Thread %s finished, remaining records: %s", threadIndex, remainingRecords));
});
thread.setName("Record Thread " + i);
threads[threadIndex] = thread;
}
}