private def handleReduceOpenStreamInternal()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala [239:343]


  private def handleReduceOpenStreamInternal(
      client: TransportClient,
      shuffleKey: String,
      fileName: String,
      startIndex: Int,
      endIndex: Int,
      readLocalShuffle: Boolean = false): PbStreamHandlerOpt = {
    try {
      logDebug(s"Received open stream request $shuffleKey $fileName $startIndex " +
        s"$endIndex get file name $fileName from client channel " +
        s"${NettyUtils.getRemoteAddress(client.getChannel)}")

      var fileInfo = getRawFileInfo(shuffleKey, fileName)
      val streamId = chunkStreamManager.nextStreamId()
      // we must get sorted fileInfo for the following cases.
      // 1. when the current request is a non-range openStream, but the original unsorted file
      //    has been deleted by another range's openStream request.
      // 2. when the current request is a range openStream request.
      if ((endIndex != Int.MaxValue && endIndex != -1 && endIndex >= startIndex) || (endIndex == Int.MaxValue && !fileInfo.addStream(
          streamId))) {
        fileInfo = partitionsSorter.getSortedFileInfo(
          shuffleKey,
          fileName,
          fileInfo,
          startIndex,
          endIndex)
      }
      val meta = fileInfo.getReduceFileMeta
      val streamHandler =
        if (readLocalShuffle && !fileInfo.isInstanceOf[MemoryFileInfo]) {
          chunkStreamManager.registerStream(
            streamId,
            shuffleKey,
            fileName)
          makeStreamHandler(
            streamId,
            meta.getNumChunks,
            meta.getChunkOffsets,
            fileInfo.asInstanceOf[DiskFileInfo].getFilePath)
        } else fileInfo match {
          case info: DiskFileInfo if info.isHdfs =>
            chunkStreamManager.registerStream(
              streamId,
              shuffleKey,
              fileName)
            makeStreamHandler(streamId, numChunks = 0)
          case info: DiskFileInfo if info.isS3 =>
            chunkStreamManager.registerStream(
              streamId,
              shuffleKey,
              fileName)
            makeStreamHandler(streamId, numChunks = 0)
          case info: DiskFileInfo if info.isOSS =>
            chunkStreamManager.registerStream(
              streamId,
              shuffleKey,
              fileName)
            makeStreamHandler(streamId, numChunks = 0)
          case _ =>
            val managedBuffer = fileInfo match {
              case df: DiskFileInfo =>
                new FileChunkBuffers(df, transportConf)
              case mf: MemoryFileInfo =>
                new MemoryChunkBuffers(mf)
            }
            val fetchTimeMetric =
              fileInfo match {
                case info: DiskFileInfo =>
                  storageManager.getFetchTimeMetric(info.getFile)
                case _ =>
                  null
              }
            chunkStreamManager.registerStream(
              streamId,
              shuffleKey,
              managedBuffer,
              fileName,
              fetchTimeMetric)
            if (meta.getNumChunks == 0)
              logDebug(s"StreamId $streamId, fileName $fileName, mapRange " +
                s"[$startIndex-$endIndex] is empty. Received from client channel " +
                s"${NettyUtils.getRemoteAddress(client.getChannel)}")
            else logDebug(
              s"StreamId $streamId, fileName $fileName, numChunks ${meta.getNumChunks}, " +
                s"mapRange [$startIndex-$endIndex]. Received from client channel " +
                s"${NettyUtils.getRemoteAddress(client.getChannel)}")
            makeStreamHandler(
              streamId,
              meta.getNumChunks)
        }
      workerSource.incCounter(WorkerSource.OPEN_STREAM_SUCCESS_COUNT)
      PbStreamHandlerOpt.newBuilder().setStreamHandler(streamHandler)
        .setStatus(StatusCode.SUCCESS.getValue)
        .build()
    } catch {
      case e: IOException =>
        val msg =
          s"Read file: $fileName with shuffleKey: $shuffleKey error from ${NettyUtils.getRemoteAddress(
            client.getChannel)}, Exception: ${e.getMessage}"
        PbStreamHandlerOpt.newBuilder().setStatus(StatusCode.OPEN_STREAM_FAILED.getValue)
          .setErrorMsg(msg).build()
    } finally {
      workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
    }
  }