in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [340:479]
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case pb: PbRegisterShuffle =>
val shuffleId = pb.getShuffleId
val numMappers = pb.getNumMappers
val numPartitions = pb.getNumPartitions
logDebug(s"Received RegisterShuffle request, " +
s"$shuffleId, $numMappers, $numPartitions.")
offerAndReserveSlots(
RegisterCallContext(context),
shuffleId,
numMappers,
numPartitions)
case pb: PbRegisterMapPartitionTask =>
val shuffleId = pb.getShuffleId
val numMappers = pb.getNumMappers
val mapId = pb.getMapId
val attemptId = pb.getAttemptId
val partitionId = pb.getPartitionId
val isSegmentGranularityVisible = pb.getIsSegmentGranularityVisible
logDebug(s"Received Register map partition task request, " +
s"$shuffleId, $numMappers, $mapId, $attemptId, $partitionId, $isSegmentGranularityVisible.")
shufflePartitionType.putIfAbsent(shuffleId, PartitionType.MAP)
offerAndReserveSlots(
RegisterCallContext(context, partitionId),
shuffleId,
numMappers,
numMappers,
partitionId,
isSegmentGranularityVisible)
case pb: PbRevive =>
val shuffleId = pb.getShuffleId
val mapIds = pb.getMapIdList
val partitionInfos = pb.getPartitionInfoList
val partitionIds = new util.ArrayList[Integer]()
val epochs = new util.ArrayList[Integer]()
val oldPartitions = new util.ArrayList[PartitionLocation]()
val causes = new util.ArrayList[StatusCode]()
(0 until partitionInfos.size()).foreach { idx =>
val info = partitionInfos.get(idx)
partitionIds.add(info.getPartitionId)
epochs.add(info.getEpoch)
if (info.hasPartition) {
oldPartitions.add(PbSerDeUtils.fromPbPartitionLocation(info.getPartition))
} else {
oldPartitions.add(null)
}
causes.add(StatusCode.fromValue(info.getStatus))
}
logDebug(s"Received Revive request, number of partitions ${partitionIds.size()}")
handleRevive(
context,
shuffleId,
mapIds,
partitionIds,
epochs,
oldPartitions,
causes)
case pb: PbPartitionSplit =>
val shuffleId = pb.getShuffleId
val partitionId = pb.getPartitionId
val epoch = pb.getEpoch
val oldPartition = PbSerDeUtils.fromPbPartitionLocation(pb.getOldPartition)
logTrace(s"Received split request, " +
s"$shuffleId, $partitionId, $epoch, $oldPartition")
changePartitionManager.handleRequestPartitionLocation(
ChangeLocationsCallContext(context, 1),
shuffleId,
partitionId,
epoch,
oldPartition,
isSegmentGranularityVisible = commitManager.isSegmentGranularityVisible(shuffleId))
case MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId, pushFailedBatch) =>
logTrace(s"Received MapperEnd TaskEnd request, " +
s"${Utils.makeMapKey(shuffleId, mapId, attemptId)}")
val partitionType = getPartitionType(shuffleId)
partitionType match {
case PartitionType.REDUCE =>
handleMapperEnd(context, shuffleId, mapId, attemptId, numMappers, pushFailedBatch)
case PartitionType.MAP =>
handleMapPartitionEnd(
context,
shuffleId,
mapId,
attemptId,
partitionId,
numMappers)
case _ =>
throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
case GetReducerFileGroup(
shuffleId: Int,
isSegmentGranularityVisible: Boolean,
serdeVersion: SerdeVersion) =>
logDebug(
s"Received GetShuffleFileGroup request for shuffleId $shuffleId, isSegmentGranularityVisible $isSegmentGranularityVisible")
handleGetReducerFileGroup(context, shuffleId, isSegmentGranularityVisible, serdeVersion)
case pb: PbGetShuffleId =>
val appShuffleId = pb.getAppShuffleId
val appShuffleIdentifier = pb.getAppShuffleIdentifier
val isWriter = pb.getIsShuffleWriter
val isBarrierStage = pb.getIsBarrierStage
logDebug(s"Received GetShuffleId request, appShuffleId $appShuffleId " +
s"appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter isBarrier $isBarrierStage.")
handleGetShuffleIdForApp(
context,
appShuffleId,
appShuffleIdentifier,
isWriter,
isBarrierStage)
case pb: PbReportShuffleFetchFailure =>
val appShuffleId = pb.getAppShuffleId
val shuffleId = pb.getShuffleId
val taskId = pb.getTaskId
logDebug(s"Received ReportShuffleFetchFailure request, appShuffleId $appShuffleId shuffleId $shuffleId taskId $taskId")
handleReportShuffleFetchFailure(context, appShuffleId, shuffleId, taskId)
case pb: PbReportBarrierStageAttemptFailure =>
val appShuffleId = pb.getAppShuffleId
val appShuffleIdentifier = pb.getAppShuffleIdentifier
logDebug(s"Received ReportBarrierStageAttemptFailure request, appShuffleId $appShuffleId, " +
s"appShuffleIdentifier $appShuffleIdentifier")
handleReportBarrierStageAttemptFailure(context, appShuffleId, appShuffleIdentifier)
case pb: PbApplicationMetaRequest =>
logDebug(s"Received request for meta info ${pb.getAppId}.")
if (applicationMeta == null) {
context.sendFailure(
new IllegalArgumentException("Application meta is not initialized for this app."))
} else {
context.reply(PbSerDeUtils.toPbApplicationMeta(applicationMeta))
}
}