public FileInfo getSortedFileInfo()

in worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java [162:233]


  public FileInfo getSortedFileInfo(
      String shuffleKey, String fileName, FileInfo fileInfo, int startMapIndex, int endMapIndex)
      throws IOException {
    String fileId = shuffleKey + "-" + fileName;
    UserIdentifier userIdentifier = fileInfo.getUserIdentifier();

    Set<String> sorted =
        sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet());
    Set<String> sorting =
        sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet());

    String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
    String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());

    synchronized (sorting) {
      if (sorted.contains(fileId)) {
        return resolve(
            shuffleKey,
            fileId,
            userIdentifier,
            sortedFilePath,
            indexFilePath,
            startMapIndex,
            endMapIndex);
      }
      if (!sorting.contains(fileId)) {
        try {
          FileSorter fileSorter = new FileSorter(fileInfo, fileId, shuffleKey);
          sorting.add(fileId);
          shuffleSortTaskDeque.put(fileSorter);
        } catch (InterruptedException e) {
          logger.info("Sorter scheduler thread is interrupted means worker is shutting down.");
          throw new IOException(
              "Sort scheduler thread is interrupted means worker is shutting down.", e);
        } catch (IOException e) {
          logger.error("File sorter access HDFS failed.", e);
          throw new IOException("File sorter access HDFS failed.", e);
        }
      }
    }

    long sortStartTime = System.currentTimeMillis();
    while (!sorted.contains(fileId)) {
      if (sorting.contains(fileId)) {
        try {
          Thread.sleep(50);
          if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
            logger.error("Sorting file {} timeout after {}ms", fileId, sortTimeout);
            throw new IOException(
                "Sort file " + fileInfo.getFilePath() + " timeout after " + sortTimeout);
          }
        } catch (InterruptedException e) {
          logger.error("Sorter scheduler thread is interrupted means worker is shutting down.", e);
          throw new IOException(
              "Sorter scheduler thread is interrupted means worker is shutting down.", e);
        }
      } else {
        logger.debug("Sorting shuffle file for {} {} failed.", shuffleKey, fileInfo.getFilePath());
        throw new IOException(
            "Sorting shuffle file for " + shuffleKey + " " + fileInfo.getFilePath() + " failed.");
      }
    }

    return resolve(
        shuffleKey,
        fileId,
        userIdentifier,
        sortedFilePath,
        indexFilePath,
        startMapIndex,
        endMapIndex);
  }