private synchronized void flush()

in server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java [595:646]


  private synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
    long pickedFlushSize = 0L;
    long expectedFlushSize = highWaterMark - lowWaterMark;
    for (Map.Entry<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> appIdToBuffers :
        bufferPool.entrySet()) {
      String appId = appIdToBuffers.getKey();
      if (requiredFlush.containsKey(appId)) {
        if (shuffleTaskManager.isAppExpired(appId)) {
          continue;
        }
        ReentrantReadWriteLock.ReadLock readLock = shuffleTaskManager.getAppReadLock(appId);
        boolean lockAcquired = false;
        try {
          lockAcquired = readLock.tryLock(flushTryLockTimeout, TimeUnit.MILLISECONDS);
          if (!lockAcquired) {
            continue;
          }
          for (Map.Entry<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers :
              appIdToBuffers.getValue().entrySet()) {
            int shuffleId = shuffleIdToBuffers.getKey();
            Set<Integer> requiredShuffleId = requiredFlush.get(appId);
            if (requiredShuffleId != null && requiredShuffleId.contains(shuffleId)) {
              for (Map.Entry<Range<Integer>, ShuffleBuffer> rangeEntry :
                  shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
                Range<Integer> range = rangeEntry.getKey();
                ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
                pickedFlushSize += shuffleBuffer.getEncodedLength();
                flushBuffer(
                    shuffleBuffer,
                    appId,
                    shuffleId,
                    range.lowerEndpoint(),
                    range.upperEndpoint(),
                    HugePartitionUtils.isHugePartition(
                        shuffleTaskManager, appId, shuffleId, range.lowerEndpoint()));
                if (pickedFlushSize > expectedFlushSize) {
                  LOG.info("Already picked enough buffers to flush {} bytes", pickedFlushSize);
                  return;
                }
              }
            }
          }
        } catch (InterruptedException e) {
          LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
        } finally {
          if (lockAcquired) {
            readLock.unlock();
          }
        }
      }
    }
  }