public void writeDataBlock()

in src/main/java/com/uber/rss/clients/MultiServerAsyncWriteClient.java [189:226]


    public void writeDataBlock(int partition, ByteBuffer value) {
        if (!threadStarted) {
            for (Thread thread: threads) {
                thread.start();
            }
            threadStarted = true;
        }

        if (!exceptions.isEmpty()) {
            throw new RssAggregateException(exceptions);
        }

        int clientIndex = partition % clients.length;
        if (partitionFanout > 1) {
            clientIndex = (clientIndex + (int)(currentAppTaskAttemptId.getTaskAttemptId() % partitionFanout)) % clients.length;
        }

        int threadIndex = clientIndex % threads.length;
        BlockingQueue<Record> recordQueue = recordQueues[threadIndex];
        try {
            long startTime = System.nanoTime();
            boolean inserted = recordQueue.offer(createUploadRecord(partition, value, clientIndex), networkTimeoutMillis, TimeUnit.MILLISECONDS);
            queueInsertTime.addAndGet(System.nanoTime() - startTime);
            if (!inserted) {
                throw new RssQueueNotReadyException(String.format("sendRecord: Record queue has no space available after waiting %s millis", networkTimeoutMillis));
            }
        } catch (InterruptedException e) {
            throw new RssException("Interrupted when inserting to record queue", e);
        }

        long currentTime = System.currentTimeMillis();
        if (currentTime - lastLogTime > logInterval) {
            for (int i = 0; i < recordQueues.length; i++) {
                logger.info(String.format("Record queue %s size: %s", i, recordQueues[i].size()));
            }
            lastLogTime = currentTime;
        }
    }