protected void updateBufferSegmentsAndResultBlocks()

in server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java [178:240]


  protected void updateBufferSegmentsAndResultBlocks(
      long lastBlockId,
      long readBufferSize,
      List<BufferSegment> bufferSegments,
      List<ShufflePartitionedBlock> resultBlocks,
      Roaring64NavigableMap expectedTaskIds) {
    long nextBlockId = lastBlockId;
    List<Long> eventIdList = Lists.newArrayList(inFlushBlockMap.keySet());
    List<Long> sortedEventId = sortFlushingEventId(eventIdList);
    int offset = 0;
    boolean hasLastBlockId = false;
    // read from inFlushBlockMap first to make sure the order of
    // data read is according to the order of data received
    // The number of events means how many batches are in flushing status,
    // it should be less than 5, or there has some problem with storage
    if (!inFlushBlockMap.isEmpty()) {
      for (Long eventId : sortedEventId) {
        hasLastBlockId =
            updateSegments(
                offset,
                inFlushBlockMap.get(eventId),
                readBufferSize,
                nextBlockId,
                bufferSegments,
                resultBlocks,
                expectedTaskIds);
        // if last blockId is found, read from begin with next cached blocks
        if (hasLastBlockId) {
          // reset blockId to read from begin in next cached blocks
          nextBlockId = Constants.INVALID_BLOCK_ID;
        }
        if (!bufferSegments.isEmpty()) {
          offset = calculateDataLength(bufferSegments);
        }
        if (offset >= readBufferSize) {
          break;
        }
      }
    }
    // try to read from cached blocks which is not in flush queue
    if (!blocksMap.isEmpty() && offset < readBufferSize) {
      hasLastBlockId =
          updateSegments(
              offset,
              blocksMap,
              readBufferSize,
              nextBlockId,
              bufferSegments,
              resultBlocks,
              expectedTaskIds);
    }
    if ((!inFlushBlockMap.isEmpty() || !blocksMap.isEmpty()) && offset == 0 && !hasLastBlockId) {
      // can't find lastBlockId, it should be flushed
      // but there still has data in memory
      // try read again with blockId = Constants.INVALID_BLOCK_ID
      updateBufferSegmentsAndResultBlocks(
          Constants.INVALID_BLOCK_ID,
          readBufferSize,
          bufferSegments,
          resultBlocks,
          expectedTaskIds);
    }
  }