protected void init()

in storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java [123:188]


  protected void init(String fullShufflePath) {
    FileSystem fs;
    Path baseFolder = new Path(fullShufflePath);
    try {
      fs = HadoopFilesystemProvider.getFilesystem(baseFolder, hadoopConf);
    } catch (Exception ioe) {
      throw new RssException("Can't get FileSystem for " + baseFolder);
    }

    FileStatus[] indexFiles = null;
    try {
      // get all index files
      indexFiles =
          fs.listStatus(
              baseFolder,
              file ->
                  file.getName().endsWith(Constants.SHUFFLE_INDEX_FILE_SUFFIX)
                      && (shuffleServerId == null || file.getName().startsWith(shuffleServerId)));
    } catch (Exception e) {
      if (e instanceof FileNotFoundException) {
        LOG.info(
            "Directory["
                + baseFolder
                + "] not found. The data may not be flushed to this directory. Nothing will be read.");
      } else {
        String failedGetIndexFileMsg = "Can't list index file in  " + baseFolder;
        LOG.error(failedGetIndexFileMsg, e);
      }
      return;
    }

    if (indexFiles != null && indexFiles.length != 0) {
      for (FileStatus status : indexFiles) {
        LOG.info(
            "Find index file for shuffleId["
                + shuffleId
                + "], partitionId["
                + partitionId
                + "] "
                + status.getPath());
        String filePrefix = getFileNamePrefix(status.getPath().toUri().toString());
        try {
          HadoopShuffleReadHandler handler =
              new HadoopShuffleReadHandler(
                  appId,
                  shuffleId,
                  partitionId,
                  filePrefix,
                  readBufferSize,
                  expectBlockIds,
                  processBlockIds,
                  hadoopConf,
                  distributionType,
                  expectTaskIds,
                  offHeapEnable);
          readHandlers.add(handler);
        } catch (Exception e) {
          LOG.warn("Can't create ShuffleReaderHandler for " + filePrefix, e);
        }
      }
      Collections.shuffle(readHandlers);
      LOG.info(
          "Reading order of Hadoop files with name prefix: {}",
          readHandlers.stream().map(x -> x.filePrefix).collect(Collectors.toList()));
    }
  }