in server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java [78:139]
public boolean checkIsHealthy() {
AtomicInteger num = new AtomicInteger(0);
AtomicLong totalSpace = new AtomicLong(0L);
AtomicLong wholeDiskUsedSpace = new AtomicLong(0L);
AtomicLong serviceUsedSpace = new AtomicLong(0L);
AtomicInteger corruptedDirs = new AtomicInteger(0);
CountDownLatch cdl = new CountDownLatch(storageInfos.size());
storageInfos
.parallelStream()
.forEach(
storageInfo -> {
if (!storageInfo.checkStorageReadAndWrite()) {
storageInfo.markCorrupted();
corruptedDirs.incrementAndGet();
cdl.countDown();
return;
}
totalSpace.addAndGet(getTotalSpace(storageInfo.storageDir));
wholeDiskUsedSpace.addAndGet(getWholeDiskUsedSpace(storageInfo.storageDir));
serviceUsedSpace.addAndGet(getServiceUsedSpace(storageInfo.storageDir));
if (storageInfo.checkIsSpaceEnough()) {
num.incrementAndGet();
}
cdl.countDown();
});
try {
cdl.await();
} catch (InterruptedException e) {
LOG.error("Failed to check local storage!");
}
ShuffleServerMetrics.gaugeLocalStorageTotalSpace.set(totalSpace.get());
ShuffleServerMetrics.gaugeLocalStorageWholeDiskUsedSpace.set(wholeDiskUsedSpace.get());
ShuffleServerMetrics.gaugeLocalStorageServiceUsedSpace.set(serviceUsedSpace.get());
ShuffleServerMetrics.gaugeLocalStorageTotalDirsNum.set(storageInfos.size());
ShuffleServerMetrics.gaugeLocalStorageCorruptedDirsNum.set(corruptedDirs.get());
ShuffleServerMetrics.gaugeLocalStorageUsedSpaceRatio.set(
wholeDiskUsedSpace.get() * 1.0 / totalSpace.get());
if (storageInfos.isEmpty()) {
if (isHealthy) {
LOG.info("shuffle server become unhealthy because of empty storage");
}
isHealthy = false;
return false;
}
double availablePercentage = num.get() * 100.0 / storageInfos.size();
if (Double.compare(availablePercentage, minStorageHealthyPercentage) >= 0) {
if (!isHealthy) {
LOG.info("shuffle server become healthy");
}
isHealthy = true;
} else {
if (isHealthy) {
LOG.info("shuffle server become unhealthy");
}
isHealthy = false;
}
return isHealthy;
}