in server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java [595:646]
private synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
long pickedFlushSize = 0L;
long expectedFlushSize = highWaterMark - lowWaterMark;
for (Map.Entry<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> appIdToBuffers :
bufferPool.entrySet()) {
String appId = appIdToBuffers.getKey();
if (requiredFlush.containsKey(appId)) {
if (shuffleTaskManager.isAppExpired(appId)) {
continue;
}
ReentrantReadWriteLock.ReadLock readLock = shuffleTaskManager.getAppReadLock(appId);
boolean lockAcquired = false;
try {
lockAcquired = readLock.tryLock(flushTryLockTimeout, TimeUnit.MILLISECONDS);
if (!lockAcquired) {
continue;
}
for (Map.Entry<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers :
appIdToBuffers.getValue().entrySet()) {
int shuffleId = shuffleIdToBuffers.getKey();
Set<Integer> requiredShuffleId = requiredFlush.get(appId);
if (requiredShuffleId != null && requiredShuffleId.contains(shuffleId)) {
for (Map.Entry<Range<Integer>, ShuffleBuffer> rangeEntry :
shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
Range<Integer> range = rangeEntry.getKey();
ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
pickedFlushSize += shuffleBuffer.getEncodedLength();
flushBuffer(
shuffleBuffer,
appId,
shuffleId,
range.lowerEndpoint(),
range.upperEndpoint(),
HugePartitionUtils.isHugePartition(
shuffleTaskManager, appId, shuffleId, range.lowerEndpoint()));
if (pickedFlushSize > expectedFlushSize) {
LOG.info("Already picked enough buffers to flush {} bytes", pickedFlushSize);
return;
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
} finally {
if (lockAcquired) {
readLock.unlock();
}
}
}
}
}