in server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java [105:177]
public ShuffleTaskManager(
ShuffleServerConf conf,
ShuffleFlushManager shuffleFlushManager,
ShuffleBufferManager shuffleBufferManager,
StorageManager storageManager) {
this.conf = conf;
this.shuffleFlushManager = shuffleFlushManager;
this.partitionsToBlockIds = JavaUtils.newConcurrentMap();
this.shuffleBufferManager = shuffleBufferManager;
this.storageManager = storageManager;
this.appExpiredWithoutHB = conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
this.commitCheckIntervalMax = conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
this.leakShuffleDataCheckInterval =
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
this.triggerFlushInterval = conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
// the thread for checking application status
this.scheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("checkResource");
scheduledExecutorService.scheduleAtFixedRate(
this::preAllocatedBufferCheck,
preAllocationExpired / 2,
preAllocationExpired / 2,
TimeUnit.MILLISECONDS);
this.expiredAppCleanupExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("expiredAppCleaner");
expiredAppCleanupExecutorService.scheduleAtFixedRate(
this::checkResourceStatus,
appExpiredWithoutHB / 2,
appExpiredWithoutHB / 2,
TimeUnit.MILLISECONDS);
this.leakShuffleDataCheckExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("leakShuffleDataChecker");
leakShuffleDataCheckExecutorService.scheduleAtFixedRate(
this::checkLeakShuffleData,
leakShuffleDataCheckInterval,
leakShuffleDataCheckInterval,
TimeUnit.MILLISECONDS);
if (triggerFlushInterval > 0) {
triggerFlushExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("triggerShuffleBufferManagerFlush");
triggerFlushExecutorService.scheduleWithFixedDelay(
this::triggerFlush,
triggerFlushInterval / 2,
triggerFlushInterval,
TimeUnit.MILLISECONDS);
}
if (shuffleBufferManager != null) {
shuffleBufferManager.setShuffleTaskManager(this);
}
// the thread for clear expired resources
clearResourceThread =
() -> {
while (true) {
try {
PurgeEvent event = expiredAppIdQueue.take();
if (event instanceof AppPurgeEvent) {
removeResources(event.getAppId());
}
if (event instanceof ShufflePurgeEvent) {
removeResourcesByShuffleIds(event.getAppId(), event.getShuffleIds());
}
} catch (Exception e) {
LOG.error("Exception happened when clear resource for expired application", e);
}
}
};
Thread thread = new Thread(clearResourceThread);
thread.setName("clearResourceThread");
thread.setDaemon(true);
thread.start();
}