in server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java [98:212]
public ShuffleBufferManager(
ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager, boolean nettyServerEnabled) {
long heapSize = Runtime.getRuntime().maxMemory();
this.capacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_BUFFER_CAPACITY);
if (this.capacity < 0) {
this.capacity =
nettyServerEnabled
? (long)
(NettyUtils.getMaxDirectMemory()
* conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO))
: (long) (heapSize * conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO));
}
this.readCapacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
if (this.readCapacity < 0) {
this.readCapacity =
nettyServerEnabled
? (long)
(NettyUtils.getMaxDirectMemory()
* conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO))
: (long)
(heapSize * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
}
LOG.info(
"Init shuffle buffer manager with capacity: {}, read buffer capacity: {}.",
capacity,
readCapacity);
this.shuffleFlushManager = shuffleFlushManager;
this.bufferPool = JavaUtils.newConcurrentMap();
this.highWaterMark =
(long)
(capacity
/ 100.0
* conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE));
this.lowWaterMark =
(long)
(capacity
/ 100.0
* conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
this.bufferFlushEnabled = conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
this.bufferFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD);
this.bufferFlushBlocksNumThreshold =
conf.getInteger(ShuffleServerConf.SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD);
this.shuffleFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
this.hugePartitionSizeThresholdRef =
conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
this.hugePartitionSizeHardLimitRef =
conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_HARD_LIMIT);
this.hugePartitionSplitLimitRef =
conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SPLIT_LIMIT);
this.hugePartitionMemoryLimitSize =
Math.round(
capacity * conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
appBlockSizeMetricEnabled =
conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED);
shuffleBufferType = conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE);
flushTryLockTimeout = conf.get(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_TRYLOCK_TIMEOUT);
ShuffleServerMetrics.addLabeledCacheGauge(
BLOCK_COUNT_IN_BUFFER_POOL,
() ->
bufferPool.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
.flatMap(rangeMap -> rangeMap.asMapOfRanges().values().stream())
.mapToLong(shuffleBuffer -> shuffleBuffer.getBlockCount())
.sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledCacheGauge(
IN_FLUSH_BLOCK_COUNT_IN_BUFFER_POOL,
() ->
bufferPool.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
.flatMap(rangeMap -> rangeMap.asMapOfRanges().values().stream())
.mapToLong(shuffleBuffer -> shuffleBuffer.getInFlushBlockCount())
.sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledCacheGauge(
BUFFER_COUNT_IN_BUFFER_POOL,
() ->
bufferPool.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
.mapToLong(rangeMap -> rangeMap.asMapOfRanges().size())
.sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledGauge(
SHUFFLE_COUNT_IN_BUFFER_POOL,
() -> bufferPool.values().stream().mapToLong(innerMap -> innerMap.size()).sum());
ReconfigurableRegistry.register(
Sets.newHashSet(
ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE.key(),
ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE.key()),
(theConf, changedProperties) -> {
if (changedProperties == null) {
return;
}
if (changedProperties.contains(
ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE.key())) {
this.highWaterMark =
(long)
(capacity
/ 100.0
* conf.get(
ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE));
}
if (changedProperties.contains(
ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE.key())) {
this.lowWaterMark =
(long)
(capacity
/ 100.0
* conf.get(
ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
}
});
}