private MemoryManager()

in worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java [102:245]


  private MemoryManager(CelebornConf conf) {
    double pausePushDataRatio = conf.workerDirectMemoryRatioToPauseReceive();
    double pauseReplicateRatio = conf.workerDirectMemoryRatioToPauseReplicate();
    double resumeRatio = conf.workerDirectMemoryRatioToResume();
    double maxSortMemRatio = conf.partitionSorterDirectMemoryRatioThreshold();
    double readBufferRatio = conf.workerDirectMemoryRatioForReadBuffer();
    double shuffleStorageRatio = conf.workerDirectMemoryRatioForShuffleStorage();
    long checkInterval = conf.workerDirectMemoryPressureCheckIntervalMs();
    long reportInterval = conf.workerDirectMemoryReportIntervalSecond();
    double readBufferTargetRatio = conf.readBufferTargetRatio();
    long readBufferTargetUpdateInterval = conf.readBufferTargetUpdateInterval();
    long readBufferTargetNotifyThreshold = conf.readBufferTargetNotifyThreshold();

    maxDirectorMemory =
        DynMethods.builder("maxDirectMemory")
            .impl("jdk.internal.misc.VM") // for Java 10 and above
            .impl("sun.misc.VM") // for Java 9 and previous
            .buildStatic()
            .<Long>invoke();

    Preconditions.checkArgument(maxDirectorMemory > 0);
    Preconditions.checkArgument(pauseReplicateRatio > pausePushDataRatio);
    Preconditions.checkArgument(pausePushDataRatio > resumeRatio);
    Preconditions.checkArgument(resumeRatio > (readBufferRatio + shuffleStorageRatio));

    maxSortMemory = ((long) (maxDirectorMemory * maxSortMemRatio));
    pausePushDataThreshold = (long) (maxDirectorMemory * pausePushDataRatio);
    pauseReplicateThreshold = (long) (maxDirectorMemory * pauseReplicateRatio);
    resumeThreshold = (long) (maxDirectorMemory * resumeRatio);
    readBufferThreshold = (long) (maxDirectorMemory * readBufferRatio);
    readBufferTarget = (long) (readBufferThreshold * readBufferTargetRatio);
    memoryShuffleStorageThreshold = (long) (maxDirectorMemory * shuffleStorageRatio);

    checkService.scheduleWithFixedDelay(
        () -> {
          try {
            ServingState lastState = servingState;
            servingState = currentServingState();
            if (lastState != servingState) {
              logger.info("Serving state transformed from {} to {}", lastState, servingState);
              if (servingState == ServingState.PUSH_PAUSED) {
                pausePushDataCounter.increment();
                logger.info("Trigger action: PAUSE PUSH, RESUME REPLICATE");
                memoryPressureListeners.forEach(
                    memoryPressureListener ->
                        memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
                memoryPressureListeners.forEach(
                    memoryPressureListener ->
                        memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
                trimAllListeners();
              } else if (servingState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
                pausePushDataAndReplicateCounter.increment();
                logger.info("Trigger action: PAUSE PUSH and REPLICATE");
                memoryPressureListeners.forEach(
                    memoryPressureListener ->
                        memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
                memoryPressureListeners.forEach(
                    memoryPressureListener ->
                        memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
                trimAllListeners();
              } else {
                logger.info("Trigger action: RESUME PUSH and REPLICATE");
                memoryPressureListeners.forEach(
                    memoryPressureListener -> memoryPressureListener.onResume("all"));
              }
            } else {
              if (servingState != ServingState.NONE_PAUSED) {
                logger.debug("Trigger action: TRIM");
                trimAllListeners();
              }
            }
          } catch (Exception e) {
            logger.error("Memory tracker check error", e);
          }
        },
        checkInterval,
        checkInterval,
        TimeUnit.MILLISECONDS);

    reportService.scheduleWithFixedDelay(
        () ->
            logger.info(
                "Direct memory usage: {}/{}, disk buffer size: {}, sort memory size: {}, read buffer size: {}",
                Utils.bytesToString(getNettyUsedDirectMemory()),
                Utils.bytesToString(maxDirectorMemory),
                Utils.bytesToString(diskBufferCounter.get()),
                Utils.bytesToString(sortMemoryCounter.get()),
                Utils.bytesToString(readBufferCounter.get())),
        reportInterval,
        reportInterval,
        TimeUnit.SECONDS);

    if (readBufferThreshold > 0) {
      // if read buffer threshold is zero means that there will be no map data partitions
      readBufferDispatcher = new ReadBufferDispatcher(this, conf);
      readBufferTargetChangeListeners = new ArrayList<>();
      readBufferTargetUpdateService.scheduleWithFixedDelay(
          () -> {
            try {
              if (creditStreamManager != null) {
                int mapDataPartitionCount = creditStreamManager.getActiveMapPartitionCount();
                if (mapDataPartitionCount > 0) {
                  long currentTarget =
                      (long) Math.ceil(readBufferTarget * 1.0 / mapDataPartitionCount);
                  if (Math.abs(lastNotifiedTarget - currentTarget)
                      > readBufferTargetNotifyThreshold) {
                    synchronized (readBufferTargetChangeListeners) {
                      logger.debug(
                          "read buffer target changed {} -> {} active map partition count {}",
                          lastNotifiedTarget,
                          currentTarget,
                          mapDataPartitionCount);
                      for (ReadBufferTargetChangeListener changeListener :
                          readBufferTargetChangeListeners) {
                        changeListener.onChange(currentTarget);
                      }
                      lastNotifiedTarget = currentTarget;
                    }
                  }
                }
              }
            } catch (Exception e) {
              logger.warn("Failed update buffer target", e);
            }
          },
          readBufferTargetUpdateInterval,
          readBufferTargetUpdateInterval,
          TimeUnit.MILLISECONDS);
    }

    logger.info(
        "Memory tracker initialized with: "
            + "max direct memory: {}, pause push memory: {}, "
            + "pause replication memory: {}, resume memory: {}, "
            + "read buffer memory limit: {} target: {}, "
            + "memory shuffle storage limit: {}",
        Utils.bytesToString(maxDirectorMemory),
        Utils.bytesToString(pausePushDataThreshold),
        Utils.bytesToString(pauseReplicateThreshold),
        Utils.bytesToString(resumeThreshold),
        Utils.bytesToString(readBufferThreshold),
        Utils.bytesToString(readBufferTarget),
        Utils.bytesToString(memoryShuffleStorageThreshold));
  }