public List split()

in common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java [62:162]


  public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult) {
    if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
      return Lists.newArrayList();
    }

    ByteBuffer indexData = shuffleIndexResult.getIndexData();
    long dataFileLen = shuffleIndexResult.getDataFileLen();

    List<BufferSegment> bufferSegments = Lists.newArrayList();

    List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
    int bufferOffset = 0;
    long fileOffset = -1;
    long totalLen = 0;

    long lastExpectedBlockIndex = -1;

    List<Long> indexTaskIds = new ArrayList<>();

    /**
     * One ShuffleDataSegment should meet following requirements:
     *
     * <p>1. taskId in [startMapIndex, endMapIndex) taskIds bitmap. Attention: the index in the
     * range is not the map task id, which means the required task ids are not continuous. 2.
     * ShuffleDataSegment size should < readBufferSize 3. Single shuffleDataSegment's blocks should
     * be continuous
     */
    int index = 0;
    while (indexData.hasRemaining()) {
      try {
        long offset = indexData.getLong();
        int length = indexData.getInt();
        int uncompressLength = indexData.getInt();
        long crc = indexData.getLong();
        long blockId = indexData.getLong();
        long taskAttemptId = indexData.getLong();

        totalLen += length;
        indexTaskIds.add(taskAttemptId);

        // If ShuffleServer is flushing the file at this time, the length in the index file record
        // may be greater
        // than the length in the actual data file, and it needs to be returned at this time to
        // avoid EOFException
        if (dataFileLen != -1 && totalLen > dataFileLen) {
          long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
          LOGGER.info(
              "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
                  + "the real data file length: {}(bytes). Partition id: {}. This should not happen. "
                  + "This may happen when the data is flushing, please ignore.",
              totalLen,
              dataFileLen,
              Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
          break;
        }

        boolean conditionOfDiscontinuousBlocks =
            lastExpectedBlockIndex != -1
                && bufferSegments.size() > 0
                && expectTaskIds.contains(taskAttemptId)
                && index - lastExpectedBlockIndex != 1;

        boolean conditionOfLimitedBufferSize = bufferOffset >= readBufferSize;

        if (conditionOfDiscontinuousBlocks || conditionOfLimitedBufferSize) {
          ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
          dataFileSegments.add(sds);
          bufferSegments = Lists.newArrayList();
          bufferOffset = 0;
          fileOffset = -1;
        }

        if (expectTaskIds.contains(taskAttemptId)) {
          if (fileOffset == -1) {
            fileOffset = offset;
          }
          bufferSegments.add(
              new BufferSegment(
                  blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
          bufferOffset += length;
          lastExpectedBlockIndex = index;
        }
        index++;
      } catch (BufferUnderflowException ue) {
        throw new RssException("Read index data under flow", ue);
      }
    }

    if (bufferOffset > 0) {
      ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
      dataFileSegments.add(sds);
    }

    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug(
          "Index file task-ids sequence: {}, expected task-ids: {}",
          indexTaskIds,
          getExpectedTaskIds(expectTaskIds));
    }
    return dataFileSegments;
  }