in src/main/java/com/uber/rss/clients/MultiServerAsyncWriteClient.java [189:226]
public void writeDataBlock(int partition, ByteBuffer value) {
if (!threadStarted) {
for (Thread thread: threads) {
thread.start();
}
threadStarted = true;
}
if (!exceptions.isEmpty()) {
throw new RssAggregateException(exceptions);
}
int clientIndex = partition % clients.length;
if (partitionFanout > 1) {
clientIndex = (clientIndex + (int)(currentAppTaskAttemptId.getTaskAttemptId() % partitionFanout)) % clients.length;
}
int threadIndex = clientIndex % threads.length;
BlockingQueue<Record> recordQueue = recordQueues[threadIndex];
try {
long startTime = System.nanoTime();
boolean inserted = recordQueue.offer(createUploadRecord(partition, value, clientIndex), networkTimeoutMillis, TimeUnit.MILLISECONDS);
queueInsertTime.addAndGet(System.nanoTime() - startTime);
if (!inserted) {
throw new RssQueueNotReadyException(String.format("sendRecord: Record queue has no space available after waiting %s millis", networkTimeoutMillis));
}
} catch (InterruptedException e) {
throw new RssException("Interrupted when inserting to record queue", e);
}
long currentTime = System.currentTimeMillis();
if (currentTime - lastLogTime > logInterval) {
for (int i = 0; i < recordQueues.length; i++) {
logger.info(String.format("Record queue %s size: %s", i, recordQueues[i].size()));
}
lastLogTime = currentTime;
}
}