private def handleRpcRequestCore()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [858:945]


  private def handleRpcRequestCore(
      mode: Byte,
      message: Message,
      shuffleKey: String,
      partitionUniqueId: String,
      requestId: Long,
      checkSplit: Boolean,
      callback: RpcResponseCallback): Unit = {
    val isPrimary = PartitionLocation.getMode(mode) == PartitionLocation.Mode.PRIMARY
    val messageType = message.`type`()
    log.debug(
      s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, " +
        s"partitionUniqueId:$partitionUniqueId")
    val (workerSourcePrimary, workerSourceReplica) =
      messageType match {
        case Type.PUSH_DATA_HAND_SHAKE =>
          (
            WorkerSource.PRIMARY_PUSH_DATA_HANDSHAKE_TIME,
            WorkerSource.REPLICA_PUSH_DATA_HANDSHAKE_TIME)
        case Type.REGION_START =>
          (WorkerSource.PRIMARY_REGION_START_TIME, WorkerSource.REPLICA_REGION_START_TIME)
        case Type.REGION_FINISH =>
          (WorkerSource.PRIMARY_REGION_FINISH_TIME, WorkerSource.REPLICA_REGION_FINISH_TIME)
        case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
      }

    val location =
      if (isPrimary) {
        partitionLocationInfo.getPrimaryLocation(shuffleKey, partitionUniqueId)
      } else {
        partitionLocationInfo.getReplicaLocation(shuffleKey, partitionUniqueId)
      }
    workerSource.startTimer(
      if (isPrimary) workerSourcePrimary else workerSourceReplica,
      s"$requestId")
    val wrappedCallback =
      new WrappedRpcResponseCallback(
        messageType,
        isPrimary,
        requestId,
        null,
        location,
        if (isPrimary) workerSourcePrimary else workerSourceReplica,
        callback)

    if (locationIsNull(
        messageType,
        shuffleKey,
        partitionUniqueId,
        null,
        location,
        callback,
        wrappedCallback)) return

    val fileWriter =
      getFileWriterAndCheck(messageType, location, isPrimary, callback) match {
        case (true, _) => return
        case (false, f: FileWriter) => f
      }

    if (checkSplit && checkDiskFullAndSplit(fileWriter, isPrimary, null, callback)) return

    try {
      messageType match {
        case Type.PUSH_DATA_HAND_SHAKE =>
          fileWriter.asInstanceOf[MapPartitionFileWriter].pushDataHandShake(
            message.asInstanceOf[PushDataHandShake].numPartitions,
            message.asInstanceOf[PushDataHandShake].bufferSize)
        case Type.REGION_START =>
          fileWriter.asInstanceOf[MapPartitionFileWriter].regionStart(
            message.asInstanceOf[RegionStart].currentRegionIndex,
            message.asInstanceOf[RegionStart].isBroadcast)
        case Type.REGION_FINISH =>
          fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish()
        case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
      }
      // for primary , send data to replica
      if (location.hasPeer && isPrimary) {
        // TODO replica
        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
      } else {
        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
      }
    } catch {
      case t: Throwable =>
        callback.onFailure(new CelebornIOException(s"$messageType failed", t))
    }
  }