in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [1179:1301]
private def handleMapPartitionRpcRequestCore(
requestId: Long,
pbMsg: GeneratedMessageV3,
msg: Message,
isLegacy: Boolean,
messageType: Message.Type,
mode: Mode,
shuffleKey: String,
partitionUniqueId: String,
checkSplit: Boolean,
callback: RpcResponseCallback): Unit = {
log.debug(
s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, " +
s"partitionUniqueId:$partitionUniqueId")
val isPrimary = mode == Mode.Primary
val (workerSourcePrimary, workerSourceReplica) =
messageType match {
case Type.PUSH_DATA_HAND_SHAKE =>
(
WorkerSource.PRIMARY_PUSH_DATA_HANDSHAKE_TIME,
WorkerSource.REPLICA_PUSH_DATA_HANDSHAKE_TIME)
case Type.REGION_START =>
(WorkerSource.PRIMARY_REGION_START_TIME, WorkerSource.REPLICA_REGION_START_TIME)
case Type.REGION_FINISH =>
(WorkerSource.PRIMARY_REGION_FINISH_TIME, WorkerSource.REPLICA_REGION_FINISH_TIME)
case Type.SEGMENT_START =>
(WorkerSource.PRIMARY_SEGMENT_START_TIME, WorkerSource.REPLICA_SEGMENT_START_TIME)
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
val location =
if (isPrimary) {
partitionLocationInfo.getPrimaryLocation(shuffleKey, partitionUniqueId)
} else {
partitionLocationInfo.getReplicaLocation(shuffleKey, partitionUniqueId)
}
workerSource.startTimer(
if (isPrimary) workerSourcePrimary else workerSourceReplica,
s"$requestId")
val wrappedCallback =
new WrappedRpcResponseCallback(
messageType,
isPrimary,
requestId,
null,
location,
if (isPrimary) workerSourcePrimary else workerSourceReplica,
callback)
if (locationIsNull(
messageType,
shuffleKey,
partitionUniqueId,
location,
callback)) return
val fileWriter =
getFileWriterAndCheck(messageType, location, isPrimary, callback) match {
case (true, _) => return
case (false, f: PartitionDataWriter) => f
}
// During worker shutdown, worker will return HARD_SPLIT for all existed partition.
// This should before return exception to make current push request revive and retry.
val isPartitionSplitEnabled = fileWriter.getCurrentFileInfo.isPartitionSplitEnabled
if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
logInfo(s"$messageType return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}
if (checkSplit && (messageType == Type.REGION_START || messageType ==
Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit(
fileWriter,
isPrimary) == StatusCode.HARD_SPLIT) {
workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}
try {
messageType match {
case Type.PUSH_DATA_HAND_SHAKE =>
val pbPushDataHandShake: PbPushDataHandShake =
if (isLegacy)
PbPushDataHandShake.newBuilder()
.setNumPartitions(msg.asInstanceOf[PushDataHandShake].numPartitions)
.setBufferSize(msg.asInstanceOf[PushDataHandShake].bufferSize)
.build()
else
pbMsg.asInstanceOf[PbPushDataHandShake]
fileWriter.handleEvents(pbPushDataHandShake)
case Type.REGION_START =>
val pbRegionStart: PbRegionStart =
if (isLegacy) {
PbRegionStart.newBuilder()
.setCurrentRegionIndex(msg.asInstanceOf[RegionStart].currentRegionIndex)
.setIsBroadcast(msg.asInstanceOf[RegionStart].isBroadcast)
.build()
} else
pbMsg.asInstanceOf[PbRegionStart]
fileWriter.handleEvents(pbRegionStart)
case Type.REGION_FINISH =>
val pbRegionFinish: PbRegionFinish = PbRegionFinish.newBuilder().build()
fileWriter.handleEvents(pbRegionFinish)
case Type.SEGMENT_START =>
fileWriter.handleEvents(pbMsg)
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
// for primary , send data to replica
if (location.hasPeer && isPrimary) {
// TODO replica
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
} else {
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
} catch {
case t: Throwable =>
callback.onFailure(new CelebornIOException(s"$messageType failed", t))
}
}