in client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java [476:562]
public void run() {
while (!stop) {
try {
RssGetSortedShuffleDataRequest request =
new RssGetSortedShuffleDataRequest(
appId, shuffleId, partitionId, blockId, retryMax, retryIntervalMax);
RssGetSortedShuffleDataResponse response = client.getSortedShuffleData(request);
if (response.getStatusCode() != StatusCode.SUCCESS
|| response.getMergeState() == MergeState.INTERNAL_ERROR.code()) {
fetchError = response.getMessage();
nextShuffleServerInfo();
break;
} else if (response.getMergeState() == MergeState.INITED.code()) {
fetchError = "Remote merge should be started!";
nextShuffleServerInfo();
break;
}
if (response.getMergeState() == MergeState.MERGING.code()
&& response.getNextBlockId() == -1) {
// All merged data has been read, but there may be data that has not yet been merged. So
// wait done!
LOG.info("RMRecordsFetcher will sleep {} ms", sleepTime);
Thread.sleep(this.sleepTime);
this.sleepTime = Math.min(this.sleepTime * 2, maxFetchSleepTime);
} else if (response.getMergeState() == MergeState.DONE.code()
&& response.getNextBlockId() == -1) {
// All data has been read. Send the last records.
if (recordBuffer.size() > 0) {
nextQueue.put(recordBuffer);
}
nextQueue.setProducerDone(true);
break;
} else if (response.getMergeState() == MergeState.DONE.code()
|| response.getMergeState() == MergeState.MERGING.code()) {
this.sleepTime = initFetchSleepTime;
blockId = response.getNextBlockId();
ManagedBuffer managedBuffer = null;
ByteBuf byteBuf = null;
RecordsReader<K, V> reader = null;
try {
managedBuffer = response.getData();
byteBuf = managedBuffer.byteBuf();
// Fetch blocks and parsing blocks are a synchronous process. If the two processes are
// split into two different threads, then will be asynchronous processes. Although it
// seems to save time, it actually consumes more memory.
reader =
new RecordsReader<>(
rssConf,
SerInputStream.newInputStream(byteBuf),
keyClass,
valueClass,
raw,
false);
reader.init();
while (reader.next()) {
if (metrics != null) {
metrics.incRecordsRead(1);
}
if (recordBuffer.size() >= maxRecordsNumPerBuffer) {
nextQueue.put(recordBuffer);
recordBuffer = new RecordBuffer<>(partitionId);
}
recordBuffer.addRecord(reader.getCurrentKey(), reader.getCurrentValue());
}
} finally {
if (reader != null) {
reader.close();
}
if (byteBuf != null) {
byteBuf.release();
}
if (managedBuffer != null) {
managedBuffer.release();
}
}
} else {
fetchError = "Receive wrong offset from server, offset is " + response.getNextBlockId();
nextShuffleServerInfo();
break;
}
} catch (Throwable e) {
error = e;
stop = true;
LOG.info("Found exception when fetch sorted record, caused by ", e);
}
}
}