in server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java [136:274]
public ShuffleTaskManager(
ShuffleServerConf conf,
ShuffleFlushManager shuffleFlushManager,
ShuffleBufferManager shuffleBufferManager,
StorageManager storageManager,
ShuffleMergeManager shuffleMergeManager) {
this.conf = conf;
this.shuffleFlushManager = shuffleFlushManager;
this.shuffleBufferManager = shuffleBufferManager;
this.storageManager = storageManager;
this.shuffleMergeManager = shuffleMergeManager;
org.apache.uniffle.common.StorageType storageType =
conf.get(ShuffleServerConf.RSS_STORAGE_TYPE);
this.storageTypeWithMemory =
storageType == null
? false
: StorageType.withMemory(StorageType.valueOf(storageType.name()));
this.appExpiredWithoutHB = conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
this.commitCheckIntervalMax = conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
this.storageRemoveOperationTimeoutSec =
conf.getLong(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC);
this.leakShuffleDataCheckInterval =
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
this.triggerFlushInterval = conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
// the thread for checking application status
this.scheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("checkResource");
scheduledExecutorService.scheduleAtFixedRate(
this::preAllocatedBufferCheck,
preAllocationExpired / 2,
preAllocationExpired / 2,
TimeUnit.MILLISECONDS);
this.expiredAppCleanupExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("expiredAppCleaner");
expiredAppCleanupExecutorService.scheduleAtFixedRate(
this::checkResourceStatus,
appExpiredWithoutHB / 2,
appExpiredWithoutHB / 2,
TimeUnit.MILLISECONDS);
this.leakShuffleDataCheckExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("leakShuffleDataChecker");
leakShuffleDataCheckExecutorService.scheduleAtFixedRate(
this::checkLeakShuffleData,
leakShuffleDataCheckInterval,
leakShuffleDataCheckInterval,
TimeUnit.MILLISECONDS);
if (triggerFlushInterval > 0) {
triggerFlushExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("triggerShuffleBufferManagerFlush");
triggerFlushExecutorService.scheduleWithFixedDelay(
this::triggerFlush,
triggerFlushInterval / 2,
triggerFlushInterval,
TimeUnit.MILLISECONDS);
}
if (shuffleBufferManager != null) {
shuffleBufferManager.setShuffleTaskManager(this);
}
shuffleBlockIdManager = ShuffleBlockIdManagerFactory.createShuffleBlockIdManager(conf);
appLocks =
CacheBuilder.newBuilder()
.expireAfterAccess(3600, TimeUnit.SECONDS)
.maximumSize(Integer.MAX_VALUE)
.build();
// the thread for clear expired resources
Runnable clearResourceRunnable =
() -> {
while (true) {
PurgeEvent event = null;
try {
event = expiredAppIdQueue.take();
long startTime = System.currentTimeMillis();
if (event instanceof AppPurgeEvent) {
removeResources(event.getAppId(), true);
double usedTime =
(System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND;
ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
}
if (event instanceof AppUnregisterPurgeEvent) {
removeResources(event.getAppId(), false);
double usedTime =
(System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND;
ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
}
if (event instanceof ShufflePurgeEvent) {
removeResourcesByShuffleIds(event.getAppId(), event.getShuffleIds());
double usedTime =
(System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND;
ShuffleServerMetrics.summaryTotalRemoveResourceByShuffleIdsTime.observe(usedTime);
}
} catch (Exception e) {
StringBuilder diagnosticMessageBuilder =
new StringBuilder(
"Exception happened when clearing resource for expired application");
if (event != null) {
diagnosticMessageBuilder.append(" for appId: ");
diagnosticMessageBuilder.append(event.getAppId());
if (CollectionUtils.isNotEmpty(event.getShuffleIds())) {
diagnosticMessageBuilder.append(", shuffleIds: ");
diagnosticMessageBuilder.append(event.getShuffleIds());
}
}
LOG.error("{}", diagnosticMessageBuilder, e);
}
}
};
clearResourceThread = new Thread(clearResourceRunnable);
clearResourceThread.setName("clearResourceThread");
clearResourceThread.setDaemon(true);
topNShuffleDataSizeOfAppCalcTask = new TopNShuffleDataSizeOfAppCalcTask(this, conf);
topNShuffleDataSizeOfAppCalcTask.start();
ShuffleServerMetrics.addLabeledGauge(REQUIRE_BUFFER_COUNT, requireBufferIds::size);
ShuffleServerMetrics.addLabeledCacheGauge(
REPORTED_BLOCK_COUNT,
() ->
shuffleBlockIdManager.getTotalBlockCount()
+ shuffleTaskInfos.values().stream()
.map(ShuffleTaskInfo::getShuffleBlockIdManager)
.filter(manager -> manager != null && manager != shuffleBlockIdManager)
.mapToLong(ShuffleBlockIdManager::getTotalBlockCount)
.sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledCacheGauge(
CACHED_BLOCK_COUNT,
() ->
shuffleTaskInfos.values().stream()
.map(ShuffleTaskInfo::getCachedBlockIds)
.flatMap(map -> map.values().stream())
.mapToLong(Roaring64NavigableMap::getLongCardinality)
.sum(),
2 * 60 * 1000L /* 2 minutes */);
}