public DfsPartitionReader()

in client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java [61:174]


  public DfsPartitionReader(
      CelebornConf conf,
      String shuffleKey,
      PartitionLocation location,
      TransportClientFactory clientFactory,
      int startMapIndex,
      int endMapIndex)
      throws IOException {
    shuffleChunkSize = (int) conf.shuffleChunkSize();
    fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
    results = new LinkedBlockingQueue<>();

    this.location = location;

    final List<Long> chunkOffsets = new ArrayList<>();
    if (endMapIndex != Integer.MAX_VALUE) {
      long fetchTimeoutMs = conf.clientFetchTimeoutMs();
      try {
        TransportClient client =
            clientFactory.createClient(location.getHost(), location.getFetchPort());
        TransportMessage openStream =
            new TransportMessage(
                MessageType.OPEN_STREAM,
                PbOpenStream.newBuilder()
                    .setShuffleKey(shuffleKey)
                    .setFileName(location.getFileName())
                    .setStartIndex(startMapIndex)
                    .setEndIndex(endMapIndex)
                    .build()
                    .toByteArray());
        ByteBuffer response = client.sendRpcSync(openStream.toByteBuffer(), fetchTimeoutMs);
        TransportMessage.fromByteBuffer(response).getParsedPayload();
        // Parse this message to ensure sort is done.
      } catch (IOException | InterruptedException e) {
        throw new IOException(
            "read shuffle file from HDFS failed, filePath: "
                + location.getStorageInfo().getFilePath(),
            e);
      }
      hdfsInputStream =
          ShuffleClient.getHdfsFs(conf)
              .open(new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
      chunkOffsets.addAll(
          getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, endMapIndex));
    } else {
      hdfsInputStream =
          ShuffleClient.getHdfsFs(conf).open(new Path(location.getStorageInfo().getFilePath()));
      chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
    }
    logger.debug(
        "DFS {} index count:{} offsets:{}",
        location.getStorageInfo().getFilePath(),
        chunkOffsets.size(),
        chunkOffsets);
    if (chunkOffsets.size() > 1) {
      numChunks = chunkOffsets.size() - 1;
      fetchThread =
          new Thread(
              () -> {
                try {
                  while (!closed && currentChunkIndex < numChunks) {
                    while (results.size() >= fetchMaxReqsInFlight) {
                      Thread.sleep(50);
                    }
                    long offset = chunkOffsets.get(currentChunkIndex);
                    long length = chunkOffsets.get(currentChunkIndex + 1) - offset;
                    logger.debug("read {} offset {} length {}", currentChunkIndex, offset, length);
                    byte[] buffer = new byte[(int) length];
                    try {
                      hdfsInputStream.readFully(offset, buffer);
                    } catch (IOException e) {
                      logger.warn(
                          "read HDFS {} failed will retry, error detail {}",
                          location.getStorageInfo().getFilePath(),
                          e);
                      try {
                        hdfsInputStream.close();
                        hdfsInputStream =
                            ShuffleClient.getHdfsFs(conf)
                                .open(
                                    new Path(
                                        Utils.getSortedFilePath(
                                            location.getStorageInfo().getFilePath())));
                        hdfsInputStream.readFully(offset, buffer);
                      } catch (IOException ex) {
                        logger.warn(
                            "retry read HDFS {} failed, error detail {} ",
                            location.getStorageInfo().getFilePath(),
                            e);
                        exception.set(ex);
                        break;
                      }
                    }
                    results.put(Unpooled.wrappedBuffer(buffer));
                    logger.debug("add index {} to results", currentChunkIndex++);
                  }
                } catch (Exception e) {
                  logger.warn("Fetch thread is cancelled.", e);
                  // cancel a task for speculative, ignore this exception
                }
                logger.debug("fetch {} is done.", location.getStorageInfo().getFilePath());
              },
              "Dfs-fetch-thread" + location.getStorageInfo().getFilePath());
      fetchThread.setUncaughtExceptionHandler(
          new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
              logger.error("thread {} failed with exception {}", t, e);
            }
          });
      fetchThread.start();
      logger.debug("Start dfs read on location {}", location);
    }
  }