def handleMapPartitionPushData()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [979:1061]


  def handleMapPartitionPushData(pushData: PushData, callback: RpcResponseCallback): Unit = {
    val shuffleKey = pushData.shuffleKey
    val mode = PartitionLocation.getMode(pushData.mode)
    val body = pushData.body.asInstanceOf[NettyManagedBuffer].getBuf
    val isPrimary = mode == PartitionLocation.Mode.PRIMARY

    val key = s"${pushData.requestId}"
    if (isPrimary) {
      workerSource.startTimer(WorkerSource.PRIMARY_PUSH_DATA_TIME, key)
    } else {
      workerSource.startTimer(WorkerSource.REPLICA_PUSH_DATA_TIME, key)
    }

    // find FileWriter responsible for the data
    val location =
      if (isPrimary) {
        partitionLocationInfo.getPrimaryLocation(shuffleKey, pushData.partitionUniqueId)
      } else {
        partitionLocationInfo.getReplicaLocation(shuffleKey, pushData.partitionUniqueId)
      }

    val wrappedCallback =
      new WrappedRpcResponseCallback(
        pushData.`type`(),
        isPrimary,
        pushData.requestId,
        null,
        location,
        if (isPrimary) WorkerSource.PRIMARY_PUSH_DATA_TIME else WorkerSource.REPLICA_PUSH_DATA_TIME,
        callback)

    if (locationIsNull(
        pushData.`type`(),
        shuffleKey,
        pushData.partitionUniqueId,
        location,
        callback)) return

    val fileWriter =
      getFileWriterAndCheck(pushData.`type`(), location, isPrimary, callback) match {
        case (true, _) => return
        case (false, f: PartitionDataWriter) => f
      }

    // for mappartition we will not check whether disk full or split partition

    fileWriter.incrementPendingWrites()

    if (fileWriter.isClosed) {
      val fileInfo = fileWriter.getCurrentFileInfo
      logWarning(
        s"[handleMapPartitionPushData] FileWriter is already closed! File path ${fileInfo.getFilePath} " +
          s"length ${fileInfo.getFileLength}")
      callback.onFailure(new CelebornIOException("File already closed!"))
      fileWriter.decrementPendingWrites()
      return
    }
    val writePromise = Promise[Array[StatusCode]]()
    writeLocalData(Seq(fileWriter), body, shuffleKey, isPrimary, None, writePromise)
    // for primary, send data to replica
    if (location.hasPeer && isPrimary) {
      // to do
      Try(Await.result(writePromise.future, Duration.Inf)) match {
        case Success(result) =>
          if (result(0) != StatusCode.SUCCESS) {
            wrappedCallback.onFailure(new CelebornIOException("Write data failed!"))
          } else {
            wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
          }
        case Failure(e) => wrappedCallback.onFailure(e)
      }
    } else {
      Try(Await.result(writePromise.future, Duration.Inf)) match {
        case Success(result) =>
          if (result(0) != StatusCode.SUCCESS) {
            wrappedCallback.onFailure(new CelebornIOException("Write data failed!"))
          } else {
            wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
          }
        case Failure(e) => wrappedCallback.onFailure(e)
      }
    }
  }