public ShuffleTaskManager()

in server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java [136:274]


  public ShuffleTaskManager(
      ShuffleServerConf conf,
      ShuffleFlushManager shuffleFlushManager,
      ShuffleBufferManager shuffleBufferManager,
      StorageManager storageManager,
      ShuffleMergeManager shuffleMergeManager) {
    this.conf = conf;
    this.shuffleFlushManager = shuffleFlushManager;
    this.shuffleBufferManager = shuffleBufferManager;
    this.storageManager = storageManager;
    this.shuffleMergeManager = shuffleMergeManager;
    org.apache.uniffle.common.StorageType storageType =
        conf.get(ShuffleServerConf.RSS_STORAGE_TYPE);
    this.storageTypeWithMemory =
        storageType == null
            ? false
            : StorageType.withMemory(StorageType.valueOf(storageType.name()));
    this.appExpiredWithoutHB = conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
    this.commitCheckIntervalMax = conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
    this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
    this.storageRemoveOperationTimeoutSec =
        conf.getLong(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC);
    this.leakShuffleDataCheckInterval =
        conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
    this.triggerFlushInterval = conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
    // the thread for checking application status
    this.scheduledExecutorService =
        ThreadUtils.getDaemonSingleThreadScheduledExecutor("checkResource");
    scheduledExecutorService.scheduleAtFixedRate(
        this::preAllocatedBufferCheck,
        preAllocationExpired / 2,
        preAllocationExpired / 2,
        TimeUnit.MILLISECONDS);
    this.expiredAppCleanupExecutorService =
        ThreadUtils.getDaemonSingleThreadScheduledExecutor("expiredAppCleaner");
    expiredAppCleanupExecutorService.scheduleAtFixedRate(
        this::checkResourceStatus,
        appExpiredWithoutHB / 2,
        appExpiredWithoutHB / 2,
        TimeUnit.MILLISECONDS);
    this.leakShuffleDataCheckExecutorService =
        ThreadUtils.getDaemonSingleThreadScheduledExecutor("leakShuffleDataChecker");
    leakShuffleDataCheckExecutorService.scheduleAtFixedRate(
        this::checkLeakShuffleData,
        leakShuffleDataCheckInterval,
        leakShuffleDataCheckInterval,
        TimeUnit.MILLISECONDS);
    if (triggerFlushInterval > 0) {
      triggerFlushExecutorService =
          ThreadUtils.getDaemonSingleThreadScheduledExecutor("triggerShuffleBufferManagerFlush");
      triggerFlushExecutorService.scheduleWithFixedDelay(
          this::triggerFlush,
          triggerFlushInterval / 2,
          triggerFlushInterval,
          TimeUnit.MILLISECONDS);
    }
    if (shuffleBufferManager != null) {
      shuffleBufferManager.setShuffleTaskManager(this);
    }

    shuffleBlockIdManager = ShuffleBlockIdManagerFactory.createShuffleBlockIdManager(conf);

    appLocks =
        CacheBuilder.newBuilder()
            .expireAfterAccess(3600, TimeUnit.SECONDS)
            .maximumSize(Integer.MAX_VALUE)
            .build();

    // the thread for clear expired resources
    Runnable clearResourceRunnable =
        () -> {
          while (true) {
            PurgeEvent event = null;
            try {
              event = expiredAppIdQueue.take();
              long startTime = System.currentTimeMillis();
              if (event instanceof AppPurgeEvent) {
                removeResources(event.getAppId(), true);
                double usedTime =
                    (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND;
                ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
              }
              if (event instanceof AppUnregisterPurgeEvent) {
                removeResources(event.getAppId(), false);
                double usedTime =
                    (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND;
                ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
              }
              if (event instanceof ShufflePurgeEvent) {
                removeResourcesByShuffleIds(event.getAppId(), event.getShuffleIds());
                double usedTime =
                    (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND;
                ShuffleServerMetrics.summaryTotalRemoveResourceByShuffleIdsTime.observe(usedTime);
              }
            } catch (Exception e) {
              StringBuilder diagnosticMessageBuilder =
                  new StringBuilder(
                      "Exception happened when clearing resource for expired application");
              if (event != null) {
                diagnosticMessageBuilder.append(" for appId: ");
                diagnosticMessageBuilder.append(event.getAppId());

                if (CollectionUtils.isNotEmpty(event.getShuffleIds())) {
                  diagnosticMessageBuilder.append(", shuffleIds: ");
                  diagnosticMessageBuilder.append(event.getShuffleIds());
                }
              }
              LOG.error("{}", diagnosticMessageBuilder, e);
            }
          }
        };
    clearResourceThread = new Thread(clearResourceRunnable);
    clearResourceThread.setName("clearResourceThread");
    clearResourceThread.setDaemon(true);

    topNShuffleDataSizeOfAppCalcTask = new TopNShuffleDataSizeOfAppCalcTask(this, conf);
    topNShuffleDataSizeOfAppCalcTask.start();

    ShuffleServerMetrics.addLabeledGauge(REQUIRE_BUFFER_COUNT, requireBufferIds::size);
    ShuffleServerMetrics.addLabeledCacheGauge(
        REPORTED_BLOCK_COUNT,
        () ->
            shuffleBlockIdManager.getTotalBlockCount()
                + shuffleTaskInfos.values().stream()
                    .map(ShuffleTaskInfo::getShuffleBlockIdManager)
                    .filter(manager -> manager != null && manager != shuffleBlockIdManager)
                    .mapToLong(ShuffleBlockIdManager::getTotalBlockCount)
                    .sum(),
        2 * 60 * 1000L /* 2 minutes */);
    ShuffleServerMetrics.addLabeledCacheGauge(
        CACHED_BLOCK_COUNT,
        () ->
            shuffleTaskInfos.values().stream()
                .map(ShuffleTaskInfo::getCachedBlockIds)
                .flatMap(map -> map.values().stream())
                .mapToLong(Roaring64NavigableMap::getLongCardinality)
                .sum(),
        2 * 60 * 1000L /* 2 minutes */);
  }