in server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java [98:208]
public boolean checkIsHealthy() {
AtomicInteger num = new AtomicInteger(0);
AtomicLong totalSpace = new AtomicLong(0L);
AtomicLong wholeDiskUsedSpace = new AtomicLong(0L);
AtomicLong serviceUsedSpace = new AtomicLong(0L);
AtomicInteger corruptedDirs = new AtomicInteger(0);
Map<StorageInfo, CompletableFuture<Void>> futureMap = new HashMap<>();
for (StorageInfo storageInfo : storageInfos) {
CompletableFuture<Void> storageCheckFuture =
CompletableFuture.supplyAsync(
() -> {
if (!storageInfo.checkStorageReadAndWrite()) {
storageInfo.markCorrupted();
corruptedDirs.incrementAndGet();
return null;
}
long total = getTotalSpace(storageInfo.storageDir);
long availableBytes = getFreeSpace(storageInfo.storageDir);
totalSpace.addAndGet(total);
wholeDiskUsedSpace.addAndGet(total - availableBytes);
long usedBytes = getServiceUsedSpace(storageInfo.storageDir);
serviceUsedSpace.addAndGet(usedBytes);
storageInfo.updateServiceUsedBytes(usedBytes);
storageInfo.updateStorageFreeSpace(availableBytes);
boolean isWritable = storageInfo.canWrite();
ShuffleServerMetrics.gaugeLocalStorageIsWritable
.labels(storageInfo.storage.getBasePath())
.set(isWritable ? 0 : 1);
ShuffleServerMetrics.gaugeLocalStorageIsTimeout
.labels(storageInfo.storage.getBasePath())
.set(0);
if (storageInfo.checkIsSpaceEnough(total, availableBytes)) {
num.incrementAndGet();
}
return null;
},
workers);
futureMap.put(
storageInfo,
CompletableFutureExtension.orTimeout(
storageCheckFuture, diskCheckerExecutionTimeoutMs.get(), TimeUnit.MILLISECONDS));
}
for (Map.Entry<StorageInfo, CompletableFuture<Void>> entry : futureMap.entrySet()) {
StorageInfo storageInfo = entry.getKey();
CompletableFuture<Void> f = entry.getValue();
try {
f.get();
} catch (Exception e) {
if (e instanceof ExecutionException) {
if (e.getCause() instanceof TimeoutException) {
LOG.warn(
"Timeout of checking local storage: {}. The current disk's IO load may be very high.",
storageInfo.storage.getBasePath());
ShuffleServerMetrics.gaugeLocalStorageIsTimeout
.labels(storageInfo.storage.getBasePath())
.set(1);
continue;
}
}
throw new RssException(e);
}
}
ShuffleServerMetrics.gaugeLocalStorageTotalSpace.set(totalSpace.get());
ShuffleServerMetrics.gaugeLocalStorageWholeDiskUsedSpace.set(wholeDiskUsedSpace.get());
ShuffleServerMetrics.gaugeLocalStorageServiceUsedSpace.set(serviceUsedSpace.get());
ShuffleServerMetrics.gaugeLocalStorageTotalDirsNum.set(storageInfos.size());
ShuffleServerMetrics.gaugeLocalStorageCorruptedDirsNum.set(corruptedDirs.get());
ShuffleServerMetrics.gaugeLocalStorageUsedSpaceRatio.set(
wholeDiskUsedSpace.get() * 1.0 / totalSpace.get());
if (storageInfos.isEmpty()) {
if (isHealthy) {
LOG.info("shuffle server become unhealthy because of empty storage");
}
isHealthy = false;
return false;
}
if (markUnhealthyOnceDirCorrupted && corruptedDirs.get() > 0) {
if (isHealthy) {
LOG.info(
"shuffle server become unhealthy because {} corrupted dirs exist", corruptedDirs.get());
}
isHealthy = false;
return false;
}
double availablePercentage = num.get() * 100.0 / storageInfos.size();
if (Double.compare(availablePercentage, minStorageHealthyPercentage) >= 0) {
if (!isHealthy) {
LOG.info("shuffle server become healthy");
}
isHealthy = true;
} else {
if (isHealthy) {
LOG.info("shuffle server become unhealthy");
}
isHealthy = false;
}
return isHealthy;
}