public FileInfo getSortedFileInfo()

in worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java [203:304]


  public FileInfo getSortedFileInfo(
      String shuffleKey, String fileName, FileInfo fileInfo, int startMapIndex, int endMapIndex)
      throws IOException {
    if (fileInfo instanceof MemoryFileInfo) {
      MemoryFileInfo memoryFileInfo = ((MemoryFileInfo) fileInfo);
      Map<Integer, List<ShuffleBlockInfo>> indexesMap;
      sortMemoryShuffleFile(memoryFileInfo);
      indexesMap = memoryFileInfo.getSortedIndexes();

      ReduceFileMeta reduceFileMeta =
          new ReduceFileMeta(
              ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
                  startMapIndex, endMapIndex, shuffleChunkSize, indexesMap, true),
              shuffleChunkSize);
      CompositeByteBuf targetBuffer =
          MemoryManager.instance().getStorageByteBufAllocator().compositeBuffer(Integer.MAX_VALUE);
      ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
          startMapIndex,
          endMapIndex,
          indexesMap,
          memoryFileInfo.getSortedBuffer(),
          targetBuffer,
          shuffleChunkSize);
      return new MemoryFileInfo(
          memoryFileInfo.getUserIdentifier(),
          memoryFileInfo.isPartitionSplitEnabled(),
          reduceFileMeta,
          targetBuffer);
    } else {
      DiskFileInfo diskFileInfo = ((DiskFileInfo) fileInfo);
      String fileId = shuffleKey + "-" + fileName;
      UserIdentifier userIdentifier = diskFileInfo.getUserIdentifier();
      Set<String> sorted =
          sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet());
      Set<String> sorting =
          sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet());

      String sortedFilePath = Utils.getSortedFilePath(diskFileInfo.getFilePath());
      String indexFilePath = Utils.getIndexFilePath(diskFileInfo.getFilePath());
      boolean fileSorting = true;
      synchronized (sorting) {
        if (sorted.contains(fileId)) {
          fileSorting = false;
        } else if (!sorting.contains(fileId)) {
          try {
            FileSorter fileSorter = new FileSorter(diskFileInfo, fileId, shuffleKey);
            sorting.add(fileId);
            logger.debug(
                "Adding sorter to sort queue shuffle key {}, file name {}", shuffleKey, fileName);
            shuffleSortTaskDeque.put(fileSorter);
          } catch (InterruptedException e) {
            logger.error(
                "Sorter scheduler thread is interrupted means worker is shutting down.", e);
            throw new IOException(
                "Sort scheduler thread is interrupted means worker is shutting down.", e);
          } catch (IOException e) {
            logger.error("File sorter access DFS failed.", e);
            throw new IOException("File sorter access DFS failed.", e);
          }
        }
      }

      if (fileSorting) {
        long sortStartTime = System.currentTimeMillis();
        while (!sorted.contains(fileId)) {
          if (sorting.contains(fileId)) {
            try {
              Thread.sleep(50);
              if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
                logger.error("Sorting file {} timeout after {}ms", fileId, sortTimeout);
                throw new IOException(
                    "Sort file " + diskFileInfo.getFilePath() + " timeout after " + sortTimeout);
              }
            } catch (InterruptedException e) {
              logger.error(
                  "Sorter scheduler thread is interrupted means worker is shutting down.", e);
              throw new IOException(
                  "Sorter scheduler thread is interrupted means worker is shutting down.", e);
            }
          } else {
            logger.debug(
                "Sorting shuffle file for {} {} failed.", shuffleKey, diskFileInfo.getFilePath());
            throw new IOException(
                "Sorting shuffle file for "
                    + shuffleKey
                    + " "
                    + diskFileInfo.getFilePath()
                    + " failed.");
          }
        }
      }

      return resolve(
          shuffleKey,
          fileId,
          userIdentifier,
          sortedFilePath,
          indexFilePath,
          startMapIndex,
          endMapIndex);
    }
  }