in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala [88:175]
override def receive(client: TransportClient, msg: RequestMessage): Unit = {
msg match {
case r: BufferStreamEnd =>
handleEndStreamFromClient(r)
case r: ReadAddCredit =>
handleReadAddCredit(r)
case r: ChunkFetchRequest =>
handleChunkFetchRequest(client, r)
case r: RpcRequest =>
// process PbOpenStream RPC
var streamShuffleKey: String = null
var streamFileName: String = null
try {
val pbMsg = TransportMessage.fromByteBuffer(r.body().nioByteBuffer())
val pbOpenStream = pbMsg.getParsedPayload[PbOpenStream]
val (shuffleKey, fileName, startIndex, endIndex, initialCredit) =
(
pbOpenStream.getShuffleKey,
pbOpenStream.getFileName,
pbOpenStream.getStartIndex,
pbOpenStream.getEndIndex,
pbOpenStream.getInitialCredit)
streamShuffleKey = shuffleKey
streamFileName = fileName
workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, streamShuffleKey)
handleOpenStreamInternal(
client,
shuffleKey,
fileName,
startIndex,
endIndex,
initialCredit,
r,
false)
} catch {
case _: Exception =>
// process legacy OpenStream RPCs
logDebug("Open stream with legacy RPCs")
try {
val decodedMsg = Message.decode(r.body().nioByteBuffer())
val (shuffleKey, fileName) =
if (decodedMsg.`type`() == Type.OPEN_STREAM) {
val openStream = decodedMsg.asInstanceOf[OpenStream]
(
new String(openStream.shuffleKey, StandardCharsets.UTF_8),
new String(openStream.fileName, StandardCharsets.UTF_8))
} else {
val openStreamWithCredit = decodedMsg.asInstanceOf[OpenStreamWithCredit]
(
new String(openStreamWithCredit.shuffleKey, StandardCharsets.UTF_8),
new String(openStreamWithCredit.fileName, StandardCharsets.UTF_8))
}
streamShuffleKey = shuffleKey
streamFileName = fileName
var startIndex = 0
var endIndex = 0
var initialCredit = 0
getRawFileInfo(shuffleKey, fileName).getPartitionType match {
case PartitionType.REDUCE =>
startIndex = decodedMsg.asInstanceOf[OpenStream].startMapIndex
endIndex = decodedMsg.asInstanceOf[OpenStream].endMapIndex
case PartitionType.MAP =>
initialCredit = decodedMsg.asInstanceOf[OpenStreamWithCredit].initialCredit
startIndex = decodedMsg.asInstanceOf[OpenStreamWithCredit].startIndex
endIndex = decodedMsg.asInstanceOf[OpenStreamWithCredit].endIndex
case PartitionType.MAPGROUP =>
}
handleOpenStreamInternal(
client,
shuffleKey,
fileName,
startIndex,
endIndex,
initialCredit,
r,
true)
} catch {
case e: IOException =>
handleRpcIOException(client, r.requestId, streamShuffleKey, streamFileName, e)
}
} finally {
r.body().release()
workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, streamShuffleKey)
}
case unknown: RequestMessage =>
throw new IllegalArgumentException(s"Unknown message type id: ${unknown.`type`.id}")
}
}