override def receiveAndReply()

in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [340:479]


  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case pb: PbRegisterShuffle =>
      val shuffleId = pb.getShuffleId
      val numMappers = pb.getNumMappers
      val numPartitions = pb.getNumPartitions
      logDebug(s"Received RegisterShuffle request, " +
        s"$shuffleId, $numMappers, $numPartitions.")
      offerAndReserveSlots(
        RegisterCallContext(context),
        shuffleId,
        numMappers,
        numPartitions)

    case pb: PbRegisterMapPartitionTask =>
      val shuffleId = pb.getShuffleId
      val numMappers = pb.getNumMappers
      val mapId = pb.getMapId
      val attemptId = pb.getAttemptId
      val partitionId = pb.getPartitionId
      val isSegmentGranularityVisible = pb.getIsSegmentGranularityVisible
      logDebug(s"Received Register map partition task request, " +
        s"$shuffleId, $numMappers, $mapId, $attemptId, $partitionId, $isSegmentGranularityVisible.")
      shufflePartitionType.putIfAbsent(shuffleId, PartitionType.MAP)
      offerAndReserveSlots(
        RegisterCallContext(context, partitionId),
        shuffleId,
        numMappers,
        numMappers,
        partitionId,
        isSegmentGranularityVisible)

    case pb: PbRevive =>
      val shuffleId = pb.getShuffleId
      val mapIds = pb.getMapIdList
      val partitionInfos = pb.getPartitionInfoList

      val partitionIds = new util.ArrayList[Integer]()
      val epochs = new util.ArrayList[Integer]()
      val oldPartitions = new util.ArrayList[PartitionLocation]()
      val causes = new util.ArrayList[StatusCode]()
      (0 until partitionInfos.size()).foreach { idx =>
        val info = partitionInfos.get(idx)
        partitionIds.add(info.getPartitionId)
        epochs.add(info.getEpoch)
        if (info.hasPartition) {
          oldPartitions.add(PbSerDeUtils.fromPbPartitionLocation(info.getPartition))
        } else {
          oldPartitions.add(null)
        }
        causes.add(StatusCode.fromValue(info.getStatus))
      }
      logDebug(s"Received Revive request, number of partitions ${partitionIds.size()}")
      handleRevive(
        context,
        shuffleId,
        mapIds,
        partitionIds,
        epochs,
        oldPartitions,
        causes)

    case pb: PbPartitionSplit =>
      val shuffleId = pb.getShuffleId
      val partitionId = pb.getPartitionId
      val epoch = pb.getEpoch
      val oldPartition = PbSerDeUtils.fromPbPartitionLocation(pb.getOldPartition)
      logTrace(s"Received split request, " +
        s"$shuffleId, $partitionId, $epoch, $oldPartition")
      changePartitionManager.handleRequestPartitionLocation(
        ChangeLocationsCallContext(context, 1),
        shuffleId,
        partitionId,
        epoch,
        oldPartition,
        isSegmentGranularityVisible = commitManager.isSegmentGranularityVisible(shuffleId))

    case MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId, pushFailedBatch) =>
      logTrace(s"Received MapperEnd TaskEnd request, " +
        s"${Utils.makeMapKey(shuffleId, mapId, attemptId)}")
      val partitionType = getPartitionType(shuffleId)
      partitionType match {
        case PartitionType.REDUCE =>
          handleMapperEnd(context, shuffleId, mapId, attemptId, numMappers, pushFailedBatch)
        case PartitionType.MAP =>
          handleMapPartitionEnd(
            context,
            shuffleId,
            mapId,
            attemptId,
            partitionId,
            numMappers)
        case _ =>
          throw new UnsupportedOperationException(s"Not support $partitionType yet")
      }

    case GetReducerFileGroup(
          shuffleId: Int,
          isSegmentGranularityVisible: Boolean,
          serdeVersion: SerdeVersion) =>
      logDebug(
        s"Received GetShuffleFileGroup request for shuffleId $shuffleId, isSegmentGranularityVisible $isSegmentGranularityVisible")
      handleGetReducerFileGroup(context, shuffleId, isSegmentGranularityVisible, serdeVersion)

    case pb: PbGetShuffleId =>
      val appShuffleId = pb.getAppShuffleId
      val appShuffleIdentifier = pb.getAppShuffleIdentifier
      val isWriter = pb.getIsShuffleWriter
      val isBarrierStage = pb.getIsBarrierStage
      logDebug(s"Received GetShuffleId request, appShuffleId $appShuffleId " +
        s"appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter isBarrier $isBarrierStage.")
      handleGetShuffleIdForApp(
        context,
        appShuffleId,
        appShuffleIdentifier,
        isWriter,
        isBarrierStage)

    case pb: PbReportShuffleFetchFailure =>
      val appShuffleId = pb.getAppShuffleId
      val shuffleId = pb.getShuffleId
      val taskId = pb.getTaskId
      logDebug(s"Received ReportShuffleFetchFailure request, appShuffleId $appShuffleId shuffleId $shuffleId taskId $taskId")
      handleReportShuffleFetchFailure(context, appShuffleId, shuffleId, taskId)

    case pb: PbReportBarrierStageAttemptFailure =>
      val appShuffleId = pb.getAppShuffleId
      val appShuffleIdentifier = pb.getAppShuffleIdentifier
      logDebug(s"Received ReportBarrierStageAttemptFailure request, appShuffleId $appShuffleId, " +
        s"appShuffleIdentifier $appShuffleIdentifier")
      handleReportBarrierStageAttemptFailure(context, appShuffleId, appShuffleIdentifier)

    case pb: PbApplicationMetaRequest =>
      logDebug(s"Received request for meta info ${pb.getAppId}.")
      if (applicationMeta == null) {
        context.sendFailure(
          new IllegalArgumentException("Application meta is not initialized for this app."))
      } else {
        context.reply(PbSerDeUtils.toPbApplicationMeta(applicationMeta))
      }
  }