in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [858:945]
private def handleRpcRequestCore(
mode: Byte,
message: Message,
shuffleKey: String,
partitionUniqueId: String,
requestId: Long,
checkSplit: Boolean,
callback: RpcResponseCallback): Unit = {
val isPrimary = PartitionLocation.getMode(mode) == PartitionLocation.Mode.PRIMARY
val messageType = message.`type`()
log.debug(
s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, " +
s"partitionUniqueId:$partitionUniqueId")
val (workerSourcePrimary, workerSourceReplica) =
messageType match {
case Type.PUSH_DATA_HAND_SHAKE =>
(
WorkerSource.PRIMARY_PUSH_DATA_HANDSHAKE_TIME,
WorkerSource.REPLICA_PUSH_DATA_HANDSHAKE_TIME)
case Type.REGION_START =>
(WorkerSource.PRIMARY_REGION_START_TIME, WorkerSource.REPLICA_REGION_START_TIME)
case Type.REGION_FINISH =>
(WorkerSource.PRIMARY_REGION_FINISH_TIME, WorkerSource.REPLICA_REGION_FINISH_TIME)
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
val location =
if (isPrimary) {
partitionLocationInfo.getPrimaryLocation(shuffleKey, partitionUniqueId)
} else {
partitionLocationInfo.getReplicaLocation(shuffleKey, partitionUniqueId)
}
workerSource.startTimer(
if (isPrimary) workerSourcePrimary else workerSourceReplica,
s"$requestId")
val wrappedCallback =
new WrappedRpcResponseCallback(
messageType,
isPrimary,
requestId,
null,
location,
if (isPrimary) workerSourcePrimary else workerSourceReplica,
callback)
if (locationIsNull(
messageType,
shuffleKey,
partitionUniqueId,
null,
location,
callback,
wrappedCallback)) return
val fileWriter =
getFileWriterAndCheck(messageType, location, isPrimary, callback) match {
case (true, _) => return
case (false, f: FileWriter) => f
}
if (checkSplit && checkDiskFullAndSplit(fileWriter, isPrimary, null, callback)) return
try {
messageType match {
case Type.PUSH_DATA_HAND_SHAKE =>
fileWriter.asInstanceOf[MapPartitionFileWriter].pushDataHandShake(
message.asInstanceOf[PushDataHandShake].numPartitions,
message.asInstanceOf[PushDataHandShake].bufferSize)
case Type.REGION_START =>
fileWriter.asInstanceOf[MapPartitionFileWriter].regionStart(
message.asInstanceOf[RegionStart].currentRegionIndex,
message.asInstanceOf[RegionStart].isBroadcast)
case Type.REGION_FINISH =>
fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish()
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
// for primary , send data to replica
if (location.hasPeer && isPrimary) {
// TODO replica
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
} else {
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
} catch {
case t: Throwable =>
callback.onFailure(new CelebornIOException(s"$messageType failed", t))
}
}