private MemoryManager()

in worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java [127:296]


  private MemoryManager(CelebornConf conf, StorageManager storageManager, AbstractSource source) {
    double pausePushDataRatio = conf.workerDirectMemoryRatioToPauseReceive();
    double pauseReplicateRatio = conf.workerDirectMemoryRatioToPauseReplicate();
    this.directMemoryResumeRatio = conf.workerDirectMemoryRatioToResume();
    this.pinnedMemoryResumeRatio = conf.workerPinnedMemoryRatioToResume();
    double maxSortMemRatio = conf.workerPartitionSorterDirectMemoryRatioThreshold();
    double readBufferRatio = conf.workerDirectMemoryRatioForReadBuffer();
    double memoryFileStorageRatio = conf.workerDirectMemoryRatioForMemoryFilesStorage();
    long checkInterval = conf.workerDirectMemoryPressureCheckIntervalMs();
    this.pinnedMemoryCheckEnabled = conf.workerPinnedMemoryCheckEnabled();
    this.pinnedMemoryCheckInterval = conf.workerPinnedMemoryCheckIntervalMs();
    this.workerPinnedMemoryResumeKeepTime = conf.workerPinnedMemoryResumeKeepTime();
    long reportInterval = conf.workerDirectMemoryReportIntervalSecond();
    double readBufferTargetRatio = conf.readBufferTargetRatio();
    long readBufferTargetUpdateInterval = conf.readBufferTargetUpdateInterval();
    long readBufferTargetNotifyThreshold = conf.readBufferTargetNotifyThreshold();
    boolean aggressiveEvictModeEnabled = conf.workerMemoryFileStorageEictAggressiveModeEnabled();
    double evictRatio = conf.workerMemoryFileStorageEvictRatio();
    forceAppendPauseSpentTimeThreshold = conf.metricsWorkerForceAppendPauseSpentTimeThreshold();
    maxDirectMemory =
        DynMethods.builder("maxDirectMemory")
            .impl("jdk.internal.misc.VM") // for Java 10 and above
            .impl("sun.misc.VM") // for Java 9 and previous
            .buildStatic()
            .<Long>invoke();

    Preconditions.checkArgument(maxDirectMemory > 0);
    Preconditions.checkArgument(
        pauseReplicateRatio > pausePushDataRatio,
        String.format(
            "Invalid config, %s(%s) should be greater than %s(%s)",
            CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE().key(),
            pauseReplicateRatio,
            CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
            pausePushDataRatio));
    Preconditions.checkArgument(pausePushDataRatio > directMemoryResumeRatio);
    if (memoryFileStorageRatio > 0) {
      Preconditions.checkArgument(
          directMemoryResumeRatio > (readBufferRatio + memoryFileStorageRatio));
    }

    maxSortMemory = ((long) (maxDirectMemory * maxSortMemRatio));
    pausePushDataThreshold = (long) (maxDirectMemory * pausePushDataRatio);
    pauseReplicateThreshold = (long) (maxDirectMemory * pauseReplicateRatio);
    readBufferThreshold = (long) (maxDirectMemory * readBufferRatio);
    readBufferTarget = (long) (readBufferThreshold * readBufferTargetRatio);
    memoryFileStorageThreshold = (long) (maxDirectMemory * memoryFileStorageRatio);

    checkService.scheduleWithFixedDelay(
        () -> {
          try {
            switchServingState();
          } catch (Exception e) {
            logger.error("Memory tracker check error", e);
          }
        },
        checkInterval,
        checkInterval,
        TimeUnit.MILLISECONDS);

    reportService.scheduleWithFixedDelay(
        () ->
            logger.info(
                "Direct memory usage: {}/{}, "
                    + "disk buffer size: {}, "
                    + "sort memory size: {}, "
                    + "read buffer size: {}, "
                    + "memory file storage size : {}",
                Utils.bytesToString(getNettyUsedDirectMemory()),
                Utils.bytesToString(maxDirectMemory),
                Utils.bytesToString(diskBufferCounter.get()),
                Utils.bytesToString(sortMemoryCounter.get()),
                Utils.bytesToString(readBufferCounter.get()),
                Utils.bytesToString(memoryFileStorageCounter.sum())),
        reportInterval,
        reportInterval,
        TimeUnit.SECONDS);

    if (readBufferThreshold > 0) {
      // if read buffer threshold is zero means that there will be no map data partitions
      readBufferDispatcher = new ReadBufferDispatcher(this, conf, source);
      readBufferTargetUpdateService.scheduleWithFixedDelay(
          () -> {
            try {
              if (creditStreamManager != null) {
                int mapDataPartitionCount = creditStreamManager.getActiveMapPartitionCount();
                if (mapDataPartitionCount > 0) {
                  long currentTarget =
                      (long) Math.ceil(readBufferTarget * 1.0 / mapDataPartitionCount);
                  if (Math.abs(lastNotifiedTarget - currentTarget)
                      > readBufferTargetNotifyThreshold) {
                    synchronized (readBufferTargetChangeListeners) {
                      logger.debug(
                          "read buffer target changed {} -> {} active map partition count {}",
                          lastNotifiedTarget,
                          currentTarget,
                          mapDataPartitionCount);
                      for (ReadBufferTargetChangeListener changeListener :
                          readBufferTargetChangeListeners) {
                        changeListener.onChange(currentTarget);
                      }
                      lastNotifiedTarget = currentTarget;
                    }
                  }
                }
              }
            } catch (Exception e) {
              logger.warn("Failed update buffer target", e);
            }
          },
          readBufferTargetUpdateInterval,
          readBufferTargetUpdateInterval,
          TimeUnit.MILLISECONDS);
    }

    this.storageManager = storageManager;
    if (memoryFileStorageThreshold > 0
        && storageManager != null
        && storageManager.localOrDfsStorageAvailable()) {
      ScheduledExecutorService memoryFileStorageService =
          ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
      memoryFileStorageService.scheduleWithFixedDelay(
          () -> {
            try {
              if (shouldEvict(aggressiveEvictModeEnabled, evictRatio)) {
                List<PartitionDataWriter> memoryWriters =
                    new ArrayList<>(storageManager.memoryWriters().values());
                if (memoryWriters.isEmpty()) {
                  return;
                }
                logger.info("Start evicting {} memory file infos", memoryWriters.size());
                // always evict the largest memory file info first
                memoryWriters.sort(
                    Comparator.comparingLong(o -> o.getMemoryFileInfo().getFileLength()));
                Collections.reverse(memoryWriters);
                for (PartitionDataWriter writer : memoryWriters) {
                  // this branch means that there is no memory pressure
                  if (!shouldEvict(aggressiveEvictModeEnabled, evictRatio)) {
                    break;
                  }
                  logger.debug("Evict writer {}", writer);
                  writer.evict(true);
                }
              }
            } catch (Exception e) {
              logger.error("Evict thread encounter error", e);
            }
          },
          checkInterval,
          checkInterval,
          TimeUnit.MILLISECONDS);
    }

    logger.info(
        "Memory tracker initialized with: "
            + "max direct memory: {}, pause push memory: {}, "
            + "pause replication memory: {},  "
            + "read buffer memory limit: {} target: {}, "
            + "memory shuffle storage limit: {}, "
            + "resume by direct memory ratio: {}, "
            + "resume by pinned memory ratio: {}",
        Utils.bytesToString(maxDirectMemory),
        Utils.bytesToString(pausePushDataThreshold),
        Utils.bytesToString(pauseReplicateThreshold),
        Utils.bytesToString(readBufferThreshold),
        Utils.bytesToString(readBufferTarget),
        Utils.bytesToString(memoryFileStorageThreshold),
        directMemoryResumeRatio,
        pinnedMemoryResumeRatio);
  }