private def handleOpenStreamInternal()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala [177:241]


  private def handleOpenStreamInternal(
      client: TransportClient,
      shuffleKey: String,
      fileName: String,
      startIndex: Int,
      endIndex: Int,
      initialCredit: Int,
      request: RpcRequest,
      isLegacy: Boolean): Unit = {
    try {
      var fileInfo = getRawFileInfo(shuffleKey, fileName)
      fileInfo.getPartitionType match {
        case PartitionType.REDUCE =>
          if (endIndex != Integer.MAX_VALUE) {
            fileInfo = partitionsSorter.getSortedFileInfo(
              shuffleKey,
              fileName,
              fileInfo,
              startIndex,
              endIndex)
          }
          logDebug(s"Received chunk fetch request $shuffleKey $fileName $startIndex " +
            s"$endIndex get file info $fileInfo from client channel " +
            s"${NettyUtils.getRemoteAddress(client.getChannel)}")
          if (fileInfo.isHdfs) {
            replyStreamHandler(client, request.requestId, 0, 0, isLegacy)
          } else {
            val buffers = new FileManagedBuffers(fileInfo, transportConf)
            val fetchTimeMetrics = storageManager.getFetchTimeMetric(fileInfo.getFile)
            val streamId = chunkStreamManager.registerStream(
              shuffleKey,
              buffers,
              fetchTimeMetrics)
            if (fileInfo.numChunks() == 0)
              logDebug(s"StreamId $streamId, fileName $fileName, mapRange " +
                s"[$startIndex-$endIndex] is empty. Received from client channel " +
                s"${NettyUtils.getRemoteAddress(client.getChannel)}")
            else logDebug(
              s"StreamId $streamId, fileName $fileName, numChunks ${fileInfo.numChunks()}, " +
                s"mapRange [$startIndex-$endIndex]. Received from client channel " +
                s"${NettyUtils.getRemoteAddress(client.getChannel)}")
            replyStreamHandler(client, request.requestId, streamId, fileInfo.numChunks(), isLegacy)
          }
        case PartitionType.MAP =>
          val creditStreamHandler =
            new Consumer[java.lang.Long] {
              override def accept(streamId: java.lang.Long): Unit = {
                replyStreamHandler(client, request.requestId, streamId, 0, isLegacy)
              }
            }

          creditStreamManager.registerStream(
            creditStreamHandler,
            client.getChannel,
            initialCredit,
            startIndex,
            endIndex,
            fileInfo)
        case PartitionType.MAPGROUP =>
      }
    } catch {
      case e: IOException =>
        handleRpcIOException(client, request.requestId, shuffleKey, fileName, e)
    }
  }