protected List splitCommon()

in common/src/main/java/org/apache/uniffle/common/segment/AbstractSegmentSplitter.java [43:136]


  protected List<ShuffleDataSegment> splitCommon(
      ShuffleIndexResult shuffleIndexResult, Predicate<Long> taskFilter) {
    if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
      return Lists.newArrayList();
    }

    ByteBuffer indexData = shuffleIndexResult.getIndexData();
    long dataFileLen = shuffleIndexResult.getDataFileLen();
    int[] storageIds = shuffleIndexResult.getStorageIds();

    List<BufferSegment> bufferSegments = Lists.newArrayList();
    List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
    int bufferOffset = 0;
    long fileOffset = -1;
    long totalLength = 0;

    int storageIndex = 0;
    long preOffset = -1;
    int preStorageId = -1;
    int currentStorageId = 0;

    while (indexData.hasRemaining()) {
      try {
        final long offset = indexData.getLong();
        final int length = indexData.getInt();
        final int uncompressLength = indexData.getInt();
        final long crc = indexData.getLong();
        final long blockId = indexData.getLong();
        final long taskAttemptId = indexData.getLong();

        if (storageIds.length == 0) {
          currentStorageId = -1;
        } else if (preOffset > offset) {
          storageIndex++;
          if (storageIndex >= storageIds.length) {
            LOGGER.warn("storageIds length {} is not enough.", storageIds.length);
          }
          currentStorageId = storageIds[storageIndex];
        } else {
          currentStorageId = storageIds[storageIndex];
        }
        preOffset = offset;

        totalLength += length;

        if (dataFileLen != -1 && totalLength > dataFileLen) {
          LOGGER.info(
              "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
                  + "the real data file length: {}(bytes). Block id: {}"
                  + "This may happen when the data is flushing, please ignore.",
              totalLength,
              dataFileLen,
              blockId);
          break;
        }

        boolean storageChanged = preStorageId != -1 && currentStorageId != preStorageId;

        if (bufferOffset >= readBufferSize
            || storageChanged
            || (taskFilter != null && !taskFilter.test(taskAttemptId))) {
          if (bufferOffset > 0) {
            ShuffleDataSegment sds =
                new ShuffleDataSegment(fileOffset, bufferOffset, preStorageId, bufferSegments);
            dataFileSegments.add(sds);
            bufferSegments = Lists.newArrayList();
            bufferOffset = 0;
            fileOffset = -1;
          }
        }

        if (taskFilter == null || taskFilter.test(taskAttemptId)) {
          if (fileOffset == -1) {
            fileOffset = offset;
          }
          bufferSegments.add(
              new BufferSegment(
                  blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
          preStorageId = currentStorageId;
          bufferOffset += length;
        }
      } catch (BufferUnderflowException ue) {
        throw new RssException("Read index data under flow", ue);
      }
    }

    if (bufferOffset > 0) {
      ShuffleDataSegment sds =
          new ShuffleDataSegment(fileOffset, bufferOffset, currentStorageId, bufferSegments);
      dataFileSegments.add(sds);
    }

    return dataFileSegments;
  }