public DfsPartitionReader()

in client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java [84:182]


  public DfsPartitionReader(
      CelebornConf conf,
      String shuffleKey,
      PartitionLocation location,
      PbStreamHandler pbStreamHandler,
      TransportClientFactory clientFactory,
      int startMapIndex,
      int endMapIndex,
      MetricsCallback metricsCallback,
      int startChunkIndex,
      int endChunkIndex,
      Optional<PartitionReaderCheckpointMetadata> checkpointMetadata)
      throws IOException {
    this.conf = conf;
    shuffleChunkSize = conf.dfsReadChunkSize();
    fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
    results = new LinkedBlockingQueue<>();

    this.metricsCallback = metricsCallback;
    this.location = location;
    if (location.getStorageInfo() != null) {
      if (location.getStorageInfo().getType() == StorageInfo.Type.S3) {
        this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
      } else if (location.getStorageInfo().getType() == StorageInfo.Type.OSS) {
        this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.OSS);
      }
    } else {
      this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
    }

    long fetchTimeoutMs = conf.clientFetchTimeoutMs();
    try {
      client = clientFactory.createClient(location.getHost(), location.getFetchPort());
      if (pbStreamHandler == null) {
        TransportMessage openStream =
            new TransportMessage(
                MessageType.OPEN_STREAM,
                PbOpenStream.newBuilder()
                    .setShuffleKey(shuffleKey)
                    .setFileName(location.getFileName())
                    .setStartIndex(startMapIndex)
                    .setEndIndex(endMapIndex)
                    .build()
                    .toByteArray());
        ByteBuffer response = client.sendRpcSync(openStream.toByteBuffer(), fetchTimeoutMs);
        streamHandler = TransportMessage.fromByteBuffer(response).getParsedPayload();
        // Parse this message to ensure sort is done.
      } else {
        streamHandler = pbStreamHandler;
      }
    } catch (IOException | InterruptedException e) {
      throw new IOException(
          "read shuffle file from DFS failed, filePath: " + location.getStorageInfo().getFilePath(),
          e);
    }
    if (endMapIndex != Integer.MAX_VALUE && endMapIndex != -1) {
      dataFilePath = new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath()));
      dfsInputStream = hadoopFs.open(dataFilePath);
      chunkOffsets.addAll(
          getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, endMapIndex));
    } else {
      dataFilePath = new Path(location.getStorageInfo().getFilePath());
      dfsInputStream = hadoopFs.open(dataFilePath);
      chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
    }
    this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex;
    this.endChunkIndex =
        endChunkIndex == -1
            ? chunkOffsets.size() - 2
            : Math.min(chunkOffsets.size() - 2, endChunkIndex);
    this.currentChunkIndex = this.startChunkIndex;
    this.numChunks = this.endChunkIndex - this.startChunkIndex + 1;

    if (checkpointMetadata.isPresent()) {
      this.partitionReaderCheckpointMetadata = checkpointMetadata;
      this.returnedChunks = checkpointMetadata.get().getReturnedChunks().size();
    } else {
      this.partitionReaderCheckpointMetadata =
          conf.isPartitionReaderCheckpointEnabled()
              ? Optional.of(new PartitionReaderCheckpointMetadata())
              : Optional.empty();
    }
    logger.debug(
        "DFS {} total offset count:{} chunk count: {} "
            + "start chunk index:{} end chunk index:{} offsets:{}",
        location.getStorageInfo().getFilePath(),
        chunkOffsets.size(),
        this.numChunks,
        this.startChunkIndex,
        this.endChunkIndex,
        chunkOffsets);
    if (this.numChunks > 0) {
      fetchThread =
          ThreadUtils.newDaemonSingleThreadExecutor(
              "celeborn-client-dfs-partition-fetcher" + location.getStorageInfo().getFilePath());
      logger.debug("Start dfs read on location {}", location);
      ShuffleClient.incrementTotalReadCounter();
    }
  }