in client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java [84:182]
public DfsPartitionReader(
CelebornConf conf,
String shuffleKey,
PartitionLocation location,
PbStreamHandler pbStreamHandler,
TransportClientFactory clientFactory,
int startMapIndex,
int endMapIndex,
MetricsCallback metricsCallback,
int startChunkIndex,
int endChunkIndex,
Optional<PartitionReaderCheckpointMetadata> checkpointMetadata)
throws IOException {
this.conf = conf;
shuffleChunkSize = conf.dfsReadChunkSize();
fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
results = new LinkedBlockingQueue<>();
this.metricsCallback = metricsCallback;
this.location = location;
if (location.getStorageInfo() != null) {
if (location.getStorageInfo().getType() == StorageInfo.Type.S3) {
this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
} else if (location.getStorageInfo().getType() == StorageInfo.Type.OSS) {
this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.OSS);
}
} else {
this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
}
long fetchTimeoutMs = conf.clientFetchTimeoutMs();
try {
client = clientFactory.createClient(location.getHost(), location.getFetchPort());
if (pbStreamHandler == null) {
TransportMessage openStream =
new TransportMessage(
MessageType.OPEN_STREAM,
PbOpenStream.newBuilder()
.setShuffleKey(shuffleKey)
.setFileName(location.getFileName())
.setStartIndex(startMapIndex)
.setEndIndex(endMapIndex)
.build()
.toByteArray());
ByteBuffer response = client.sendRpcSync(openStream.toByteBuffer(), fetchTimeoutMs);
streamHandler = TransportMessage.fromByteBuffer(response).getParsedPayload();
// Parse this message to ensure sort is done.
} else {
streamHandler = pbStreamHandler;
}
} catch (IOException | InterruptedException e) {
throw new IOException(
"read shuffle file from DFS failed, filePath: " + location.getStorageInfo().getFilePath(),
e);
}
if (endMapIndex != Integer.MAX_VALUE && endMapIndex != -1) {
dataFilePath = new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath()));
dfsInputStream = hadoopFs.open(dataFilePath);
chunkOffsets.addAll(
getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, endMapIndex));
} else {
dataFilePath = new Path(location.getStorageInfo().getFilePath());
dfsInputStream = hadoopFs.open(dataFilePath);
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
}
this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex;
this.endChunkIndex =
endChunkIndex == -1
? chunkOffsets.size() - 2
: Math.min(chunkOffsets.size() - 2, endChunkIndex);
this.currentChunkIndex = this.startChunkIndex;
this.numChunks = this.endChunkIndex - this.startChunkIndex + 1;
if (checkpointMetadata.isPresent()) {
this.partitionReaderCheckpointMetadata = checkpointMetadata;
this.returnedChunks = checkpointMetadata.get().getReturnedChunks().size();
} else {
this.partitionReaderCheckpointMetadata =
conf.isPartitionReaderCheckpointEnabled()
? Optional.of(new PartitionReaderCheckpointMetadata())
: Optional.empty();
}
logger.debug(
"DFS {} total offset count:{} chunk count: {} "
+ "start chunk index:{} end chunk index:{} offsets:{}",
location.getStorageInfo().getFilePath(),
chunkOffsets.size(),
this.numChunks,
this.startChunkIndex,
this.endChunkIndex,
chunkOffsets);
if (this.numChunks > 0) {
fetchThread =
ThreadUtils.newDaemonSingleThreadExecutor(
"celeborn-client-dfs-partition-fetcher" + location.getStorageInfo().getFilePath());
logger.debug("Start dfs read on location {}", location);
ShuffleClient.incrementTotalReadCounter();
}
}