in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala [239:343]
private def handleReduceOpenStreamInternal(
client: TransportClient,
shuffleKey: String,
fileName: String,
startIndex: Int,
endIndex: Int,
readLocalShuffle: Boolean = false): PbStreamHandlerOpt = {
try {
logDebug(s"Received open stream request $shuffleKey $fileName $startIndex " +
s"$endIndex get file name $fileName from client channel " +
s"${NettyUtils.getRemoteAddress(client.getChannel)}")
var fileInfo = getRawFileInfo(shuffleKey, fileName)
val streamId = chunkStreamManager.nextStreamId()
// we must get sorted fileInfo for the following cases.
// 1. when the current request is a non-range openStream, but the original unsorted file
// has been deleted by another range's openStream request.
// 2. when the current request is a range openStream request.
if ((endIndex != Int.MaxValue && endIndex != -1 && endIndex >= startIndex) || (endIndex == Int.MaxValue && !fileInfo.addStream(
streamId))) {
fileInfo = partitionsSorter.getSortedFileInfo(
shuffleKey,
fileName,
fileInfo,
startIndex,
endIndex)
}
val meta = fileInfo.getReduceFileMeta
val streamHandler =
if (readLocalShuffle && !fileInfo.isInstanceOf[MemoryFileInfo]) {
chunkStreamManager.registerStream(
streamId,
shuffleKey,
fileName)
makeStreamHandler(
streamId,
meta.getNumChunks,
meta.getChunkOffsets,
fileInfo.asInstanceOf[DiskFileInfo].getFilePath)
} else fileInfo match {
case info: DiskFileInfo if info.isHdfs =>
chunkStreamManager.registerStream(
streamId,
shuffleKey,
fileName)
makeStreamHandler(streamId, numChunks = 0)
case info: DiskFileInfo if info.isS3 =>
chunkStreamManager.registerStream(
streamId,
shuffleKey,
fileName)
makeStreamHandler(streamId, numChunks = 0)
case info: DiskFileInfo if info.isOSS =>
chunkStreamManager.registerStream(
streamId,
shuffleKey,
fileName)
makeStreamHandler(streamId, numChunks = 0)
case _ =>
val managedBuffer = fileInfo match {
case df: DiskFileInfo =>
new FileChunkBuffers(df, transportConf)
case mf: MemoryFileInfo =>
new MemoryChunkBuffers(mf)
}
val fetchTimeMetric =
fileInfo match {
case info: DiskFileInfo =>
storageManager.getFetchTimeMetric(info.getFile)
case _ =>
null
}
chunkStreamManager.registerStream(
streamId,
shuffleKey,
managedBuffer,
fileName,
fetchTimeMetric)
if (meta.getNumChunks == 0)
logDebug(s"StreamId $streamId, fileName $fileName, mapRange " +
s"[$startIndex-$endIndex] is empty. Received from client channel " +
s"${NettyUtils.getRemoteAddress(client.getChannel)}")
else logDebug(
s"StreamId $streamId, fileName $fileName, numChunks ${meta.getNumChunks}, " +
s"mapRange [$startIndex-$endIndex]. Received from client channel " +
s"${NettyUtils.getRemoteAddress(client.getChannel)}")
makeStreamHandler(
streamId,
meta.getNumChunks)
}
workerSource.incCounter(WorkerSource.OPEN_STREAM_SUCCESS_COUNT)
PbStreamHandlerOpt.newBuilder().setStreamHandler(streamHandler)
.setStatus(StatusCode.SUCCESS.getValue)
.build()
} catch {
case e: IOException =>
val msg =
s"Read file: $fileName with shuffleKey: $shuffleKey error from ${NettyUtils.getRemoteAddress(
client.getChannel)}, Exception: ${e.getMessage}"
PbStreamHandlerOpt.newBuilder().setStatus(StatusCode.OPEN_STREAM_FAILED.getValue)
.setErrorMsg(msg).build()
} finally {
workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
}
}