in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [203:299]
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case pb: PbRegisterShuffle =>
val shuffleId = pb.getShuffleId
val numMappers = pb.getNumMappers
val numPartitions = pb.getNumPartitions
logDebug(s"Received RegisterShuffle request, " +
s"$shuffleId, $numMappers, $numPartitions.")
offerAndReserveSlots(
RegisterCallContext(context),
shuffleId,
numMappers,
numPartitions)
case pb: PbRegisterMapPartitionTask =>
val shuffleId = pb.getShuffleId
val numMappers = pb.getNumMappers
val mapId = pb.getMapId
val attemptId = pb.getAttemptId
val partitionId = pb.getPartitionId
logDebug(s"Received Register map partition task request, " +
s"$shuffleId, $numMappers, $mapId, $attemptId, $partitionId.")
shufflePartitionType.putIfAbsent(shuffleId, PartitionType.MAP)
offerAndReserveSlots(
RegisterCallContext(context, partitionId),
shuffleId,
numMappers,
numMappers,
partitionId)
case pb: PbRevive =>
val shuffleId = pb.getShuffleId
val mapIds = pb.getMapIdList
val partitionInfos = pb.getPartitionInfoList
val partitionIds = new util.ArrayList[Integer]()
val epochs = new util.ArrayList[Integer]()
val oldPartitions = new util.ArrayList[PartitionLocation]()
val causes = new util.ArrayList[StatusCode]()
(0 until partitionInfos.size()).foreach { idx =>
val info = partitionInfos.get(idx)
partitionIds.add(info.getPartitionId)
epochs.add(info.getEpoch)
if (info.hasPartition) {
oldPartitions.add(PbSerDeUtils.fromPbPartitionLocation(info.getPartition))
} else {
oldPartitions.add(null)
}
causes.add(Utils.toStatusCode(info.getStatus))
}
logWarning(s"Received Revive request, number of partitions ${partitionIds.size()}")
handleRevive(
context,
shuffleId,
mapIds,
partitionIds,
epochs,
oldPartitions,
causes)
case pb: PbPartitionSplit =>
val shuffleId = pb.getShuffleId
val partitionId = pb.getPartitionId
val epoch = pb.getEpoch
val oldPartition = PbSerDeUtils.fromPbPartitionLocation(pb.getOldPartition)
logTrace(s"Received split request, " +
s"$shuffleId, $partitionId, $epoch, $oldPartition")
changePartitionManager.handleRequestPartitionLocation(
ChangeLocationsCallContext(context, 1),
shuffleId,
partitionId,
epoch,
oldPartition)
case MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId) =>
logTrace(s"Received MapperEnd TaskEnd request, " +
s"${Utils.makeMapKey(shuffleId, mapId, attemptId)}")
val partitionType = getPartitionType(shuffleId)
partitionType match {
case PartitionType.REDUCE =>
handleMapperEnd(context, shuffleId, mapId, attemptId, numMappers)
case PartitionType.MAP =>
handleMapPartitionEnd(
context,
shuffleId,
mapId,
attemptId,
partitionId,
numMappers)
case _ =>
throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
case GetReducerFileGroup(shuffleId: Int) =>
logDebug(s"Received GetShuffleFileGroup request," +
s"shuffleId $shuffleId.")
handleGetReducerFileGroup(context, shuffleId)
}