in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala [177:241]
private def handleOpenStreamInternal(
client: TransportClient,
shuffleKey: String,
fileName: String,
startIndex: Int,
endIndex: Int,
initialCredit: Int,
request: RpcRequest,
isLegacy: Boolean): Unit = {
try {
var fileInfo = getRawFileInfo(shuffleKey, fileName)
fileInfo.getPartitionType match {
case PartitionType.REDUCE =>
if (endIndex != Integer.MAX_VALUE) {
fileInfo = partitionsSorter.getSortedFileInfo(
shuffleKey,
fileName,
fileInfo,
startIndex,
endIndex)
}
logDebug(s"Received chunk fetch request $shuffleKey $fileName $startIndex " +
s"$endIndex get file info $fileInfo from client channel " +
s"${NettyUtils.getRemoteAddress(client.getChannel)}")
if (fileInfo.isHdfs) {
replyStreamHandler(client, request.requestId, 0, 0, isLegacy)
} else {
val buffers = new FileManagedBuffers(fileInfo, transportConf)
val fetchTimeMetrics = storageManager.getFetchTimeMetric(fileInfo.getFile)
val streamId = chunkStreamManager.registerStream(
shuffleKey,
buffers,
fetchTimeMetrics)
if (fileInfo.numChunks() == 0)
logDebug(s"StreamId $streamId, fileName $fileName, mapRange " +
s"[$startIndex-$endIndex] is empty. Received from client channel " +
s"${NettyUtils.getRemoteAddress(client.getChannel)}")
else logDebug(
s"StreamId $streamId, fileName $fileName, numChunks ${fileInfo.numChunks()}, " +
s"mapRange [$startIndex-$endIndex]. Received from client channel " +
s"${NettyUtils.getRemoteAddress(client.getChannel)}")
replyStreamHandler(client, request.requestId, streamId, fileInfo.numChunks(), isLegacy)
}
case PartitionType.MAP =>
val creditStreamHandler =
new Consumer[java.lang.Long] {
override def accept(streamId: java.lang.Long): Unit = {
replyStreamHandler(client, request.requestId, streamId, 0, isLegacy)
}
}
creditStreamManager.registerStream(
creditStreamHandler,
client.getChannel,
initialCredit,
startIndex,
endIndex,
fileInfo)
case PartitionType.MAPGROUP =>
}
} catch {
case e: IOException =>
handleRpcIOException(client, request.requestId, shuffleKey, fileName, e)
}
}