override def receive()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala [88:175]


  override def receive(client: TransportClient, msg: RequestMessage): Unit = {
    msg match {
      case r: BufferStreamEnd =>
        handleEndStreamFromClient(r)
      case r: ReadAddCredit =>
        handleReadAddCredit(r)
      case r: ChunkFetchRequest =>
        handleChunkFetchRequest(client, r)
      case r: RpcRequest =>
        // process PbOpenStream RPC
        var streamShuffleKey: String = null
        var streamFileName: String = null
        try {
          val pbMsg = TransportMessage.fromByteBuffer(r.body().nioByteBuffer())
          val pbOpenStream = pbMsg.getParsedPayload[PbOpenStream]
          val (shuffleKey, fileName, startIndex, endIndex, initialCredit) =
            (
              pbOpenStream.getShuffleKey,
              pbOpenStream.getFileName,
              pbOpenStream.getStartIndex,
              pbOpenStream.getEndIndex,
              pbOpenStream.getInitialCredit)
          streamShuffleKey = shuffleKey
          streamFileName = fileName
          workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, streamShuffleKey)
          handleOpenStreamInternal(
            client,
            shuffleKey,
            fileName,
            startIndex,
            endIndex,
            initialCredit,
            r,
            false)
        } catch {
          case _: Exception =>
            // process legacy OpenStream RPCs
            logDebug("Open stream with legacy RPCs")
            try {
              val decodedMsg = Message.decode(r.body().nioByteBuffer())
              val (shuffleKey, fileName) =
                if (decodedMsg.`type`() == Type.OPEN_STREAM) {
                  val openStream = decodedMsg.asInstanceOf[OpenStream]
                  (
                    new String(openStream.shuffleKey, StandardCharsets.UTF_8),
                    new String(openStream.fileName, StandardCharsets.UTF_8))
                } else {
                  val openStreamWithCredit = decodedMsg.asInstanceOf[OpenStreamWithCredit]
                  (
                    new String(openStreamWithCredit.shuffleKey, StandardCharsets.UTF_8),
                    new String(openStreamWithCredit.fileName, StandardCharsets.UTF_8))
                }
              streamShuffleKey = shuffleKey
              streamFileName = fileName
              var startIndex = 0
              var endIndex = 0
              var initialCredit = 0
              getRawFileInfo(shuffleKey, fileName).getPartitionType match {
                case PartitionType.REDUCE =>
                  startIndex = decodedMsg.asInstanceOf[OpenStream].startMapIndex
                  endIndex = decodedMsg.asInstanceOf[OpenStream].endMapIndex
                case PartitionType.MAP =>
                  initialCredit = decodedMsg.asInstanceOf[OpenStreamWithCredit].initialCredit
                  startIndex = decodedMsg.asInstanceOf[OpenStreamWithCredit].startIndex
                  endIndex = decodedMsg.asInstanceOf[OpenStreamWithCredit].endIndex
                case PartitionType.MAPGROUP =>
              }
              handleOpenStreamInternal(
                client,
                shuffleKey,
                fileName,
                startIndex,
                endIndex,
                initialCredit,
                r,
                true)
            } catch {
              case e: IOException =>
                handleRpcIOException(client, r.requestId, streamShuffleKey, streamFileName, e)
            }
        } finally {
          r.body().release()
          workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, streamShuffleKey)
        }
      case unknown: RequestMessage =>
        throw new IllegalArgumentException(s"Unknown message type id: ${unknown.`type`.id}")
    }
  }