public ShuffleBufferManager()

in server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java [98:212]


  public ShuffleBufferManager(
      ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager, boolean nettyServerEnabled) {
    long heapSize = Runtime.getRuntime().maxMemory();
    this.capacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_BUFFER_CAPACITY);
    if (this.capacity < 0) {
      this.capacity =
          nettyServerEnabled
              ? (long)
                  (NettyUtils.getMaxDirectMemory()
                      * conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO))
              : (long) (heapSize * conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO));
    }
    this.readCapacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
    if (this.readCapacity < 0) {
      this.readCapacity =
          nettyServerEnabled
              ? (long)
                  (NettyUtils.getMaxDirectMemory()
                      * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO))
              : (long)
                  (heapSize * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
    }
    LOG.info(
        "Init shuffle buffer manager with capacity: {}, read buffer capacity: {}.",
        capacity,
        readCapacity);
    this.shuffleFlushManager = shuffleFlushManager;
    this.bufferPool = JavaUtils.newConcurrentMap();
    this.highWaterMark =
        (long)
            (capacity
                / 100.0
                * conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE));
    this.lowWaterMark =
        (long)
            (capacity
                / 100.0
                * conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
    this.bufferFlushEnabled = conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
    this.bufferFlushThreshold =
        conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD);
    this.bufferFlushBlocksNumThreshold =
        conf.getInteger(ShuffleServerConf.SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD);
    this.shuffleFlushThreshold =
        conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
    this.hugePartitionSizeThresholdRef =
        conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
    this.hugePartitionSizeHardLimitRef =
        conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_HARD_LIMIT);
    this.hugePartitionSplitLimitRef =
        conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SPLIT_LIMIT);
    this.hugePartitionMemoryLimitSize =
        Math.round(
            capacity * conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
    appBlockSizeMetricEnabled =
        conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED);
    shuffleBufferType = conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE);
    flushTryLockTimeout = conf.get(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_TRYLOCK_TIMEOUT);
    ShuffleServerMetrics.addLabeledCacheGauge(
        BLOCK_COUNT_IN_BUFFER_POOL,
        () ->
            bufferPool.values().stream()
                .flatMap(innerMap -> innerMap.values().stream())
                .flatMap(rangeMap -> rangeMap.asMapOfRanges().values().stream())
                .mapToLong(shuffleBuffer -> shuffleBuffer.getBlockCount())
                .sum(),
        2 * 60 * 1000L /* 2 minutes */);
    ShuffleServerMetrics.addLabeledCacheGauge(
        IN_FLUSH_BLOCK_COUNT_IN_BUFFER_POOL,
        () ->
            bufferPool.values().stream()
                .flatMap(innerMap -> innerMap.values().stream())
                .flatMap(rangeMap -> rangeMap.asMapOfRanges().values().stream())
                .mapToLong(shuffleBuffer -> shuffleBuffer.getInFlushBlockCount())
                .sum(),
        2 * 60 * 1000L /* 2 minutes */);
    ShuffleServerMetrics.addLabeledCacheGauge(
        BUFFER_COUNT_IN_BUFFER_POOL,
        () ->
            bufferPool.values().stream()
                .flatMap(innerMap -> innerMap.values().stream())
                .mapToLong(rangeMap -> rangeMap.asMapOfRanges().size())
                .sum(),
        2 * 60 * 1000L /* 2 minutes */);
    ShuffleServerMetrics.addLabeledGauge(
        SHUFFLE_COUNT_IN_BUFFER_POOL,
        () -> bufferPool.values().stream().mapToLong(innerMap -> innerMap.size()).sum());
    ReconfigurableRegistry.register(
        Sets.newHashSet(
            ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE.key(),
            ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE.key()),
        (theConf, changedProperties) -> {
          if (changedProperties == null) {
            return;
          }
          if (changedProperties.contains(
              ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE.key())) {
            this.highWaterMark =
                (long)
                    (capacity
                        / 100.0
                        * conf.get(
                            ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE));
          }
          if (changedProperties.contains(
              ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE.key())) {
            this.lowWaterMark =
                (long)
                    (capacity
                        / 100.0
                        * conf.get(
                            ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
          }
        });
  }