public static List getChunkOffsetsFromShuffleBlockInfos()

in common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java [42:102]


  public static List<Long> getChunkOffsetsFromShuffleBlockInfos(
      int startMapIndex,
      int endMapIndex,
      long fetchChunkSize,
      Map<Integer, List<ShuffleBlockInfo>> indexMap,
      boolean isInMemory) {
    List<Long> sortedChunkOffset = new ArrayList<>();
    ShuffleBlockInfo lastBlock = null;
    int maxMapIndex = endMapIndex;
    if (endMapIndex == Integer.MAX_VALUE) {
      // not a range read
      maxMapIndex = indexMap.keySet().stream().max(Integer::compareTo).get() + 1;
    }

    if (isInMemory) {
      long currentChunkOffset = 0;
      long lastChunkOffset = 0;
      // This sorted chunk offsets are used for fetch handler.
      // Sorted byte buf is a new composite byte buf containing the required data.
      // It will not reuse the old buffer of memory file, so the offset starts from 0.
      sortedChunkOffset.add(0l);
      for (int i = startMapIndex; i < maxMapIndex; i++) {
        List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
        if (blockInfos != null) {
          for (ShuffleBlockInfo info : blockInfos) {
            currentChunkOffset += info.length;
            if (currentChunkOffset - lastChunkOffset > fetchChunkSize) {
              lastChunkOffset = currentChunkOffset;
              sortedChunkOffset.add(currentChunkOffset);
            }
          }
        }
      }
      if (lastChunkOffset != currentChunkOffset) {
        sortedChunkOffset.add(currentChunkOffset);
      }
    } else {
      for (int i = startMapIndex; i < maxMapIndex; i++) {
        List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
        if (blockInfos != null) {
          for (ShuffleBlockInfo info : blockInfos) {
            if (sortedChunkOffset.size() == 0) {
              sortedChunkOffset.add(info.offset);
            }
            if (info.offset - sortedChunkOffset.get(sortedChunkOffset.size() - 1)
                >= fetchChunkSize) {
              sortedChunkOffset.add(info.offset);
            }
            lastBlock = info;
          }
        }
      }
      if (lastBlock != null) {
        long endChunkOffset = lastBlock.length + lastBlock.offset;
        if (!sortedChunkOffset.contains(endChunkOffset)) {
          sortedChunkOffset.add(endChunkOffset);
        }
      }
    }
    return sortedChunkOffset;
  }