def handlePushData()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [144:425]


  def handlePushData(pushData: PushData, callback: RpcResponseCallback): Unit = {
    val shuffleKey = pushData.shuffleKey
    val mode = PartitionLocation.getMode(pushData.mode)
    val body = pushData.body.asInstanceOf[NettyManagedBuffer].getBuf
    val isPrimary = mode == PartitionLocation.Mode.PRIMARY

    // For test
    if (isPrimary && testPushPrimaryDataTimeout &&
      !PushDataHandler.pushPrimaryDataTimeoutTested.getAndSet(true)) {
      return
    }

    if (!isPrimary && testPushReplicaDataTimeout &&
      !PushDataHandler.pushReplicaDataTimeoutTested.getAndSet(true)) {
      return
    }

    val key = s"${pushData.requestId}"
    val callbackWithTimer =
      if (isPrimary) {
        new RpcResponseCallbackWithTimer(
          workerSource,
          WorkerSource.PRIMARY_PUSH_DATA_TIME,
          key,
          callback)
      } else {
        new RpcResponseCallbackWithTimer(
          workerSource,
          WorkerSource.REPLICA_PUSH_DATA_TIME,
          key,
          callback)
      }

    // find FileWriter responsible for the data
    val location =
      if (isPrimary) {
        partitionLocationInfo.getPrimaryLocation(shuffleKey, pushData.partitionUniqueId)
      } else {
        partitionLocationInfo.getReplicaLocation(shuffleKey, pushData.partitionUniqueId)
      }

    // Fetch real batchId from body will add more cost and no meaning for replicate.
    val doReplicate = location != null && location.hasPeer && isPrimary
    var softSplit = false

    if (location == null) {
      val (mapId, attemptId) = getMapAttempt(body)
      // MapperAttempts for a shuffle exists after any CommitFiles request succeeds.
      // A shuffle can trigger multiple CommitFiles requests, for reasons like: HARD_SPLIT happens, StageEnd.
      // If MapperAttempts but the value is -1 for the mapId(-1 means the map has not yet finished),
      // it's probably because commitFiles for HARD_SPLIT happens.
      if (shuffleMapperAttempts.containsKey(shuffleKey)) {
        if (-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
          // partition data has already been committed
          logDebug(
            s"[Case1] Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
              s" attempt $attemptId), but this mapper has already been ended.")
          callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.MAP_ENDED.getValue)))
        } else {
          logDebug(
            s"Receive push data for committed hard split partition of (shuffle $shuffleKey, " +
              s"map $mapId attempt $attemptId)")
          workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
          callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
        }
      } else {
        if (storageManager.shuffleKeySet().contains(shuffleKey)) {
          // If there is no shuffle key in shuffleMapperAttempts but there is shuffle key
          // in StorageManager. This partition should be HARD_SPLIT partition and
          // after worker restart, some tasks still push data to this HARD_SPLIT partition.
          logDebug(s"[Case2] Receive push data for committed hard split partition of " +
            s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")
          callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
        } else {
          logWarning(s"While handle PushData, Partition location wasn't found for " +
            s"task(shuffle $shuffleKey, map $mapId, attempt $attemptId, uniqueId ${pushData.partitionUniqueId}).")
          callbackWithTimer.onFailure(
            new CelebornIOException(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND))
        }
      }
      return
    }

    // During worker shutdown, worker will return HARD_SPLIT for all existed partition.
    // This should before return exception to make current push data can revive and retry.
    if (shutdown.get()) {
      logInfo(s"Push data return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
      callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
      return
    }

    val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
    val exception = fileWriter.getException
    if (exception != null) {
      val cause =
        if (isPrimary) {
          StatusCode.PUSH_DATA_WRITE_FAIL_PRIMARY
        } else {
          StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA
        }
      logError(
        s"While handling PushData, throw $cause, fileWriter $fileWriter has exception.",
        exception)
      workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
      callbackWithTimer.onFailure(new CelebornIOException(cause))
      return
    }

    val splitStatus = checkDiskFullAndSplit(fileWriter, isPrimary)
    if (splitStatus == StatusCode.HARD_SPLIT) {
      workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
      callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
      return
    } else if (splitStatus == StatusCode.SOFT_SPLIT) {
      softSplit = true
    }

    fileWriter.incrementPendingWrites()

    if (fileWriter.isClosed) {
      val fileInfo = fileWriter.getCurrentFileInfo
      logWarning(
        s"[handlePushData] FileWriter is already closed! File path ${fileInfo.getFilePath} " +
          s"length ${fileInfo.getFileLength}")
      callbackWithTimer.onFailure(new CelebornIOException("File already closed!"))
      fileWriter.decrementPendingWrites()
      return
    }
    val writePromise = Promise[Array[StatusCode]]()
    // for primary, send data to replica
    if (doReplicate) {
      val peer = location.getPeer
      val peerWorker = new WorkerInfo(
        peer.getHost,
        peer.getRpcPort,
        peer.getPushPort,
        peer.getFetchPort,
        peer.getReplicatePort)
      if (unavailablePeers.containsKey(peerWorker)) {
        fileWriter.decrementPendingWrites()
        handlePushDataConnectionFail(callbackWithTimer, location)
        return
      }

      pushData.body().retain()
      replicateThreadPool.submit(new Runnable {
        override def run(): Unit = {
          if (unavailablePeers.containsKey(peerWorker)) {
            pushData.body().release()
            handlePushDataConnectionFail(callbackWithTimer, location)
            return
          }
          // Handle the response from replica
          val wrappedCallback = new RpcResponseCallback() {
            override def onSuccess(response: ByteBuffer): Unit = {
              Try(Await.result(writePromise.future, Duration.Inf)) match {
                case Success(result) =>
                  if (result(0) != StatusCode.SUCCESS) {
                    callback.onSuccess(ByteBuffer.wrap(Array[Byte](result(0).getValue)))
                  } else {
                    if (response.remaining() > 0) {
                      val resp = ByteBuffer.allocate(response.remaining())
                      resp.put(response)
                      resp.flip()
                      callbackWithTimer.onSuccess(resp)
                    } else if (softSplit) {
                      // TODO Currently if the worker is in soft split status, given the guess that the client
                      // will fast stop pushing data to the worker, we won't return congest status. But
                      // in the long term, especially if this issue could frequently happen, we may need to return
                      // congest&softSplit status together
                      callbackWithTimer.onSuccess(
                        ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
                    } else {
                      Option(CongestionController.instance()) match {
                        case Some(congestionController) =>
                          if (congestionController.isUserCongested(
                              fileWriter.getUserCongestionControlContext)) {
                            // Check whether primary congest the data though the replicas doesn't congest
                            // it(the response is empty)
                            callbackWithTimer.onSuccess(
                              ByteBuffer.wrap(
                                Array[Byte](
                                  StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue)))
                          } else {
                            callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
                          }
                        case None =>
                          callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
                      }
                    }
                  }
                case Failure(e) => callbackWithTimer.onFailure(e)
              }
            }

            override def onFailure(e: Throwable): Unit = {
              logError(s"PushData replication failed for partitionLocation: $location", e)
              // 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
              // 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
              // 3. Throw IOException by channel, convert to PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
              if (e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
                workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
                callbackWithTimer.onFailure(e)
              } else if (e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
                workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
                callbackWithTimer.onFailure(e)
              } else if (ExceptionUtils.connectFail(e.getMessage)) {
                workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
                callbackWithTimer.onFailure(
                  new CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
              } else {
                workerSource.incCounter(WorkerSource.REPLICATE_DATA_FAIL_NON_CRITICAL_CAUSE_COUNT)
                callbackWithTimer.onFailure(
                  new CelebornIOException(StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE_REPLICA))
              }
            }
          }
          try {
            val client = getReplicateClient(peer.getHost, peer.getReplicatePort, location.getId)
            val newPushData = new PushData(
              PartitionLocation.Mode.REPLICA.mode(),
              shuffleKey,
              pushData.partitionUniqueId,
              pushData.body)
            client.pushData(newPushData, shufflePushDataTimeout.get(shuffleKey), wrappedCallback)
          } catch {
            case e: Exception =>
              pushData.body().release()
              unavailablePeers.put(peerWorker, System.currentTimeMillis())
              workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
              logError(
                s"PushData replication failed during connecting peer for partitionLocation: $location",
                e)
              callbackWithTimer.onFailure(
                new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
          }
        }
      })
      writeLocalData(Seq(fileWriter), body, shuffleKey, isPrimary, None, writePromise)
    } else {
      // The codes here could be executed if
      // 1. the client doesn't enable push data to the replica, the primary worker could hit here
      // 2. the client enables push data to the replica, and the replica worker could hit here
      // TODO Currently if the worker is in soft split status, given the guess that the client
      // will fast stop pushing data to the worker, we won't return congest status. But
      // in the long term, especially if this issue could frequently happen, we may need to return
      // congest&softSplit status together
      writeLocalData(Seq(fileWriter), body, shuffleKey, isPrimary, None, writePromise)
      Try(Await.result(writePromise.future, Duration.Inf)) match {
        case Success(result) =>
          if (result(0) != StatusCode.SUCCESS) {
            callback.onSuccess(ByteBuffer.wrap(Array[Byte](result(0).getValue)))
          } else {
            if (softSplit) {
              callbackWithTimer.onSuccess(
                ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
            } else {
              Option(CongestionController.instance()) match {
                case Some(congestionController) =>
                  if (congestionController.isUserCongested(
                      fileWriter.getUserCongestionControlContext)) {
                    if (isPrimary) {
                      callbackWithTimer.onSuccess(
                        ByteBuffer.wrap(
                          Array[Byte](StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue)))
                    } else {
                      callbackWithTimer.onSuccess(
                        ByteBuffer.wrap(
                          Array[Byte](StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue)))
                    }
                  } else {
                    callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
                  }
                case None =>
                  callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
              }
            }
          }
        case Failure(e) => callbackWithTimer.onFailure(e)
      }
    }
  }