public static List mergeSegments()

in storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java [58:118]


  public static List<DataFileSegment> mergeSegments(
      String path, List<FileBasedShuffleSegment> segments, int readBufferSize) {
    List<DataFileSegment> dataFileSegments = Lists.newArrayList();
    if (segments != null && !segments.isEmpty()) {
      if (segments.size() == 1) {
        List<BufferSegment> bufferSegments = Lists.newArrayList();
        bufferSegments.add(
            new BufferSegment(
                segments.get(0).getBlockId(),
                0,
                segments.get(0).getLength(),
                segments.get(0).getUncompressLength(),
                segments.get(0).getCrc(),
                segments.get(0).getTaskAttemptId()));
        dataFileSegments.add(
            new DataFileSegment(
                path, segments.get(0).getOffset(), segments.get(0).getLength(), bufferSegments));
      } else {
        Collections.sort(segments);
        long start = -1;
        long latestPosition = -1;
        long skipThreshold = readBufferSize / 2;
        long lastPosition = Long.MAX_VALUE;
        List<BufferSegment> bufferSegments = Lists.newArrayList();
        for (FileBasedShuffleSegment segment : segments) {
          // check if there has expected skip range, eg, [20, 100], [1000, 1001] and the skip range
          // is [101, 999]
          if (start > -1 && segment.getOffset() - lastPosition > skipThreshold) {
            dataFileSegments.add(
                new DataFileSegment(path, start, (int) (lastPosition - start), bufferSegments));
            start = -1;
          }
          // previous FileBasedShuffleSegment are merged, start new merge process
          if (start == -1) {
            bufferSegments = Lists.newArrayList();
            start = segment.getOffset();
          }
          latestPosition = segment.getOffset() + segment.getLength();
          bufferSegments.add(
              new BufferSegment(
                  segment.getBlockId(),
                  segment.getOffset() - start,
                  segment.getLength(),
                  segment.getUncompressLength(),
                  segment.getCrc(),
                  segment.getTaskAttemptId()));
          if (latestPosition - start >= readBufferSize) {
            dataFileSegments.add(
                new DataFileSegment(path, start, (int) (latestPosition - start), bufferSegments));
            start = -1;
          }
          lastPosition = latestPosition;
        }
        if (start > -1) {
          dataFileSegments.add(
              new DataFileSegment(path, start, (int) (lastPosition - start), bufferSegments));
        }
      }
    }
    return dataFileSegments;
  }