public ShuffleTaskManager()

in server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java [105:177]


  public ShuffleTaskManager(
      ShuffleServerConf conf,
      ShuffleFlushManager shuffleFlushManager,
      ShuffleBufferManager shuffleBufferManager,
      StorageManager storageManager) {
    this.conf = conf;
    this.shuffleFlushManager = shuffleFlushManager;
    this.partitionsToBlockIds = JavaUtils.newConcurrentMap();
    this.shuffleBufferManager = shuffleBufferManager;
    this.storageManager = storageManager;
    this.appExpiredWithoutHB = conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
    this.commitCheckIntervalMax = conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
    this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
    this.leakShuffleDataCheckInterval =
        conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
    this.triggerFlushInterval = conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
    // the thread for checking application status
    this.scheduledExecutorService =
        ThreadUtils.getDaemonSingleThreadScheduledExecutor("checkResource");
    scheduledExecutorService.scheduleAtFixedRate(
        this::preAllocatedBufferCheck,
        preAllocationExpired / 2,
        preAllocationExpired / 2,
        TimeUnit.MILLISECONDS);
    this.expiredAppCleanupExecutorService =
        ThreadUtils.getDaemonSingleThreadScheduledExecutor("expiredAppCleaner");
    expiredAppCleanupExecutorService.scheduleAtFixedRate(
        this::checkResourceStatus,
        appExpiredWithoutHB / 2,
        appExpiredWithoutHB / 2,
        TimeUnit.MILLISECONDS);
    this.leakShuffleDataCheckExecutorService =
        ThreadUtils.getDaemonSingleThreadScheduledExecutor("leakShuffleDataChecker");
    leakShuffleDataCheckExecutorService.scheduleAtFixedRate(
        this::checkLeakShuffleData,
        leakShuffleDataCheckInterval,
        leakShuffleDataCheckInterval,
        TimeUnit.MILLISECONDS);
    if (triggerFlushInterval > 0) {
      triggerFlushExecutorService =
          ThreadUtils.getDaemonSingleThreadScheduledExecutor("triggerShuffleBufferManagerFlush");
      triggerFlushExecutorService.scheduleWithFixedDelay(
          this::triggerFlush,
          triggerFlushInterval / 2,
          triggerFlushInterval,
          TimeUnit.MILLISECONDS);
    }
    if (shuffleBufferManager != null) {
      shuffleBufferManager.setShuffleTaskManager(this);
    }

    // the thread for clear expired resources
    clearResourceThread =
        () -> {
          while (true) {
            try {
              PurgeEvent event = expiredAppIdQueue.take();
              if (event instanceof AppPurgeEvent) {
                removeResources(event.getAppId());
              }
              if (event instanceof ShufflePurgeEvent) {
                removeResourcesByShuffleIds(event.getAppId(), event.getShuffleIds());
              }
            } catch (Exception e) {
              LOG.error("Exception happened when clear resource for expired application", e);
            }
          }
        };
    Thread thread = new Thread(clearResourceThread);
    thread.setName("clearResourceThread");
    thread.setDaemon(true);
    thread.start();
  }