in client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java [61:174]
public DfsPartitionReader(
CelebornConf conf,
String shuffleKey,
PartitionLocation location,
TransportClientFactory clientFactory,
int startMapIndex,
int endMapIndex)
throws IOException {
shuffleChunkSize = (int) conf.shuffleChunkSize();
fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
results = new LinkedBlockingQueue<>();
this.location = location;
final List<Long> chunkOffsets = new ArrayList<>();
if (endMapIndex != Integer.MAX_VALUE) {
long fetchTimeoutMs = conf.clientFetchTimeoutMs();
try {
TransportClient client =
clientFactory.createClient(location.getHost(), location.getFetchPort());
TransportMessage openStream =
new TransportMessage(
MessageType.OPEN_STREAM,
PbOpenStream.newBuilder()
.setShuffleKey(shuffleKey)
.setFileName(location.getFileName())
.setStartIndex(startMapIndex)
.setEndIndex(endMapIndex)
.build()
.toByteArray());
ByteBuffer response = client.sendRpcSync(openStream.toByteBuffer(), fetchTimeoutMs);
TransportMessage.fromByteBuffer(response).getParsedPayload();
// Parse this message to ensure sort is done.
} catch (IOException | InterruptedException e) {
throw new IOException(
"read shuffle file from HDFS failed, filePath: "
+ location.getStorageInfo().getFilePath(),
e);
}
hdfsInputStream =
ShuffleClient.getHdfsFs(conf)
.open(new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
chunkOffsets.addAll(
getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, endMapIndex));
} else {
hdfsInputStream =
ShuffleClient.getHdfsFs(conf).open(new Path(location.getStorageInfo().getFilePath()));
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
}
logger.debug(
"DFS {} index count:{} offsets:{}",
location.getStorageInfo().getFilePath(),
chunkOffsets.size(),
chunkOffsets);
if (chunkOffsets.size() > 1) {
numChunks = chunkOffsets.size() - 1;
fetchThread =
new Thread(
() -> {
try {
while (!closed && currentChunkIndex < numChunks) {
while (results.size() >= fetchMaxReqsInFlight) {
Thread.sleep(50);
}
long offset = chunkOffsets.get(currentChunkIndex);
long length = chunkOffsets.get(currentChunkIndex + 1) - offset;
logger.debug("read {} offset {} length {}", currentChunkIndex, offset, length);
byte[] buffer = new byte[(int) length];
try {
hdfsInputStream.readFully(offset, buffer);
} catch (IOException e) {
logger.warn(
"read HDFS {} failed will retry, error detail {}",
location.getStorageInfo().getFilePath(),
e);
try {
hdfsInputStream.close();
hdfsInputStream =
ShuffleClient.getHdfsFs(conf)
.open(
new Path(
Utils.getSortedFilePath(
location.getStorageInfo().getFilePath())));
hdfsInputStream.readFully(offset, buffer);
} catch (IOException ex) {
logger.warn(
"retry read HDFS {} failed, error detail {} ",
location.getStorageInfo().getFilePath(),
e);
exception.set(ex);
break;
}
}
results.put(Unpooled.wrappedBuffer(buffer));
logger.debug("add index {} to results", currentChunkIndex++);
}
} catch (Exception e) {
logger.warn("Fetch thread is cancelled.", e);
// cancel a task for speculative, ignore this exception
}
logger.debug("fetch {} is done.", location.getStorageInfo().getFilePath());
},
"Dfs-fetch-thread" + location.getStorageInfo().getFilePath());
fetchThread.setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("thread {} failed with exception {}", t, e);
}
});
fetchThread.start();
logger.debug("Start dfs read on location {}", location);
}
}