in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [1094:1177]
private def mapPartitionRpcRequest(rpcRequest: RpcRequest)
: (GeneratedMessageV3, Message, Boolean, Type, Mode, String, String, Boolean) = {
try {
val msg = TransportMessage.fromByteBuffer(
rpcRequest.body().nioByteBuffer()).getParsedPayload.asInstanceOf[GeneratedMessageV3]
msg match {
case p: PbPushDataHandShake =>
(
msg,
null,
false,
Type.PUSH_DATA_HAND_SHAKE,
p.getMode,
p.getShuffleKey,
p.getPartitionUniqueId,
true)
case rs: PbRegionStart =>
(
msg,
null,
false,
Type.REGION_START,
rs.getMode,
rs.getShuffleKey,
rs.getPartitionUniqueId,
true)
case rf: PbRegionFinish =>
(
msg,
null,
false,
Type.REGION_FINISH,
rf.getMode,
rf.getShuffleKey,
rf.getPartitionUniqueId,
false)
case ss: PbSegmentStart =>
(
msg,
null,
false,
Type.SEGMENT_START,
ss.getMode,
ss.getShuffleKey,
ss.getPartitionUniqueId,
false)
}
} catch {
case _: Exception =>
val msg = Message.decode(rpcRequest.body().nioByteBuffer())
msg match {
case p: PushDataHandShake =>
(
null,
msg,
true,
Type.PUSH_DATA_HAND_SHAKE,
Mode.forNumber(p.mode),
p.shuffleKey,
p.partitionUniqueId,
true)
case rs: RegionStart =>
(
null,
msg,
true,
Type.REGION_START,
Mode.forNumber(rs.mode),
rs.shuffleKey,
rs.partitionUniqueId,
true)
case rf: RegionFinish =>
(
null,
msg,
true,
Type.REGION_FINISH,
Mode.forNumber(rf.mode),
rf.shuffleKey,
rf.partitionUniqueId,
false)
}
}
}