override def receiveAndReply()

in master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala [412:602]


  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case HeartbeatFromApplication(
          appId,
          totalWritten,
          fileCount,
          shuffleFallbackCount,
          shuffleFallbackCounts,
          needCheckedWorkerList,
          requestId,
          shouldResponse) =>
      logDebug(s"Received heartbeat from app $appId")
      checkAuth(context, appId)
      executeWithLeaderChecker(
        context,
        handleHeartbeatFromApplication(
          context,
          appId,
          totalWritten,
          fileCount,
          shuffleFallbackCount,
          shuffleFallbackCounts,
          needCheckedWorkerList,
          requestId,
          shouldResponse))

    case pbRegisterWorker: PbRegisterWorker =>
      val requestId = pbRegisterWorker.getRequestId
      val host = pbRegisterWorker.getHost
      val rpcPort = pbRegisterWorker.getRpcPort
      val pushPort = pbRegisterWorker.getPushPort
      val fetchPort = pbRegisterWorker.getFetchPort
      val replicatePort = pbRegisterWorker.getReplicatePort
      val internalPort = pbRegisterWorker.getInternalPort
      val networkLocation = pbRegisterWorker.getNetworkLocation
      val disks = pbRegisterWorker.getDisksList.asScala
        .map { pbDiskInfo => pbDiskInfo.getMountPoint -> PbSerDeUtils.fromPbDiskInfo(pbDiskInfo) }
        .toMap.asJava
      val userResourceConsumption =
        PbSerDeUtils.fromPbUserResourceConsumption(pbRegisterWorker.getUserResourceConsumptionMap)

      logDebug(s"Received RegisterWorker request $requestId, $host:$pushPort:$replicatePort" +
        s" $disks.")
      executeWithLeaderChecker(
        context,
        handleRegisterWorker(
          context,
          host,
          rpcPort,
          pushPort,
          fetchPort,
          replicatePort,
          internalPort,
          networkLocation,
          disks,
          userResourceConsumption,
          requestId))

    case ReleaseSlots(_, _, _, _, _) =>
      // keep it for compatible reason
      context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))

    case requestSlots @ RequestSlots(applicationId, _, _, _, _, _, _, _, _, _, _, _, _) =>
      logTrace(s"Received RequestSlots request $requestSlots.")
      checkAuth(context, applicationId)
      executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))

    case pb: PbBatchUnregisterShuffles =>
      val applicationId = pb.getAppId
      val shuffleIds = pb.getShuffleIdsList.asScala.toList
      val requestId = pb.getRequestId
      logDebug(s"Received BatchUnregisterShuffle request $requestId, $applicationId, $shuffleIds")
      checkAuth(context, applicationId)
      executeWithLeaderChecker(
        context,
        batchHandleUnregisterShuffles(context, applicationId, shuffleIds, requestId))

    case pb: PbUnregisterShuffle =>
      val applicationId = pb.getAppId
      val shuffleId = pb.getShuffleId
      val requestId = pb.getRequestId
      logDebug(s"Received UnregisterShuffle request $requestId, $applicationId, $shuffleId")
      checkAuth(context, applicationId)
      executeWithLeaderChecker(
        context,
        handleUnregisterShuffle(context, applicationId, shuffleId, requestId))

    case ApplicationLost(appId, requestId) =>
      logDebug(
        s"Received ApplicationLost request $requestId, $appId from ${context.senderAddress}.")
      checkAuth(context, appId)
      executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId))

    case HeartbeatFromWorker(
          host,
          rpcPort,
          pushPort,
          fetchPort,
          replicatePort,
          disks,
          userResourceConsumption,
          activeShuffleKey,
          highWorkload,
          workerStatus,
          requestId) =>
      logDebug(s"Received heartbeat from" +
        s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with $disks.")
      executeWithLeaderChecker(
        context,
        handleHeartbeatFromWorker(
          context,
          host,
          rpcPort,
          pushPort,
          fetchPort,
          replicatePort,
          disks,
          userResourceConsumption,
          activeShuffleKey,
          highWorkload,
          workerStatus,
          requestId))

    case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo], requestId: String) =>
      executeWithLeaderChecker(
        context,
        handleReportNodeUnavailable(context, failedWorkers, requestId))

    case ReportWorkerDecommission(workers: util.List[WorkerInfo], requestId: String) =>
      executeWithLeaderChecker(
        context,
        handleWorkerDecommission(context, workers, requestId))

    case pb: PbReviseLostShuffles =>
      executeWithLeaderChecker(
        context,
        handleReviseLostShuffle(context, pb.getAppId, pb.getLostShufflesList, pb.getRequestId))

    case pb: PbWorkerExclude =>
      val workersToAdd = new util.ArrayList[WorkerInfo](pb.getWorkersToAddList
        .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
      val workersToRemove = new util.ArrayList[WorkerInfo](pb.getWorkersToRemoveList
        .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)

      executeWithLeaderChecker(
        context,
        handleWorkerExclude(
          context,
          workersToAdd,
          workersToRemove,
          pb.getRequestId))

    case pb: PbWorkerLost =>
      val host = pb.getHost
      val rpcPort = pb.getRpcPort
      val pushPort = pb.getPushPort
      val fetchPort = pb.getFetchPort
      val replicatePort = pb.getReplicatePort
      val requestId = pb.getRequestId
      logInfo(s"Received worker lost $host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
      executeWithLeaderChecker(
        context,
        handleWorkerLost(context, host, rpcPort, pushPort, fetchPort, replicatePort, requestId))

    case CheckQuota(userIdentifier) =>
      executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, context))

    case _: PbCheckWorkersAvailable =>
      executeWithLeaderChecker(context, handleCheckWorkersAvailable(context))

    case pb: PbWorkerEventRequest =>
      val workers = new util.ArrayList[WorkerInfo](pb.getWorkersList
        .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
      executeWithLeaderChecker(
        context,
        handleWorkerEvent(
          pb.getRequestId,
          pb.getWorkerEventType.getNumber,
          workers,
          context))

    case pb: PbApplicationMetaRequest =>
      // This request is from a worker
      executeWithLeaderChecker(context, handleRequestForApplicationMeta(context, pb))

    case pb: PbRemoveWorkersUnavailableInfo =>
      val unavailableWorkers = new util.ArrayList[WorkerInfo](pb.getWorkerInfoList
        .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
      executeWithLeaderChecker(
        context,
        handleRemoveWorkersUnavailableInfos(context, unavailableWorkers, pb.getRequestId))
  }