private def handleMapPartitionRpcRequestCore()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [1179:1301]


  private def handleMapPartitionRpcRequestCore(
      requestId: Long,
      pbMsg: GeneratedMessageV3,
      msg: Message,
      isLegacy: Boolean,
      messageType: Message.Type,
      mode: Mode,
      shuffleKey: String,
      partitionUniqueId: String,
      checkSplit: Boolean,
      callback: RpcResponseCallback): Unit = {
    log.debug(
      s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, " +
        s"partitionUniqueId:$partitionUniqueId")
    val isPrimary = mode == Mode.Primary
    val (workerSourcePrimary, workerSourceReplica) =
      messageType match {
        case Type.PUSH_DATA_HAND_SHAKE =>
          (
            WorkerSource.PRIMARY_PUSH_DATA_HANDSHAKE_TIME,
            WorkerSource.REPLICA_PUSH_DATA_HANDSHAKE_TIME)
        case Type.REGION_START =>
          (WorkerSource.PRIMARY_REGION_START_TIME, WorkerSource.REPLICA_REGION_START_TIME)
        case Type.REGION_FINISH =>
          (WorkerSource.PRIMARY_REGION_FINISH_TIME, WorkerSource.REPLICA_REGION_FINISH_TIME)
        case Type.SEGMENT_START =>
          (WorkerSource.PRIMARY_SEGMENT_START_TIME, WorkerSource.REPLICA_SEGMENT_START_TIME)
        case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
      }

    val location =
      if (isPrimary) {
        partitionLocationInfo.getPrimaryLocation(shuffleKey, partitionUniqueId)
      } else {
        partitionLocationInfo.getReplicaLocation(shuffleKey, partitionUniqueId)
      }
    workerSource.startTimer(
      if (isPrimary) workerSourcePrimary else workerSourceReplica,
      s"$requestId")
    val wrappedCallback =
      new WrappedRpcResponseCallback(
        messageType,
        isPrimary,
        requestId,
        null,
        location,
        if (isPrimary) workerSourcePrimary else workerSourceReplica,
        callback)

    if (locationIsNull(
        messageType,
        shuffleKey,
        partitionUniqueId,
        location,
        callback)) return

    val fileWriter =
      getFileWriterAndCheck(messageType, location, isPrimary, callback) match {
        case (true, _) => return
        case (false, f: PartitionDataWriter) => f
      }

    // During worker shutdown, worker will return HARD_SPLIT for all existed partition.
    // This should before return exception to make current push request revive and retry.
    val isPartitionSplitEnabled = fileWriter.getCurrentFileInfo.isPartitionSplitEnabled

    if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
        Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
      logInfo(s"$messageType return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
      callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
      return
    }

    if (checkSplit && (messageType == Type.REGION_START || messageType ==
        Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit(
        fileWriter,
        isPrimary) == StatusCode.HARD_SPLIT) {
      workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
      callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
      return
    }

    try {
      messageType match {
        case Type.PUSH_DATA_HAND_SHAKE =>
          val pbPushDataHandShake: PbPushDataHandShake =
            if (isLegacy)
              PbPushDataHandShake.newBuilder()
                .setNumPartitions(msg.asInstanceOf[PushDataHandShake].numPartitions)
                .setBufferSize(msg.asInstanceOf[PushDataHandShake].bufferSize)
                .build()
            else
              pbMsg.asInstanceOf[PbPushDataHandShake]
          fileWriter.handleEvents(pbPushDataHandShake)
        case Type.REGION_START =>
          val pbRegionStart: PbRegionStart =
            if (isLegacy) {
              PbRegionStart.newBuilder()
                .setCurrentRegionIndex(msg.asInstanceOf[RegionStart].currentRegionIndex)
                .setIsBroadcast(msg.asInstanceOf[RegionStart].isBroadcast)
                .build()
            } else
              pbMsg.asInstanceOf[PbRegionStart]
          fileWriter.handleEvents(pbRegionStart)
        case Type.REGION_FINISH =>
          val pbRegionFinish: PbRegionFinish = PbRegionFinish.newBuilder().build()
          fileWriter.handleEvents(pbRegionFinish)
        case Type.SEGMENT_START =>
          fileWriter.handleEvents(pbMsg)
        case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
      }
      // for primary , send data to replica
      if (location.hasPeer && isPrimary) {
        // TODO replica
        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
      } else {
        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
      }
    } catch {
      case t: Throwable =>
        callback.onFailure(new CelebornIOException(s"$messageType failed", t))
    }
  }