public boolean checkIsHealthy()

in server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java [98:208]


  public boolean checkIsHealthy() {
    AtomicInteger num = new AtomicInteger(0);
    AtomicLong totalSpace = new AtomicLong(0L);
    AtomicLong wholeDiskUsedSpace = new AtomicLong(0L);
    AtomicLong serviceUsedSpace = new AtomicLong(0L);
    AtomicInteger corruptedDirs = new AtomicInteger(0);

    Map<StorageInfo, CompletableFuture<Void>> futureMap = new HashMap<>();
    for (StorageInfo storageInfo : storageInfos) {
      CompletableFuture<Void> storageCheckFuture =
          CompletableFuture.supplyAsync(
              () -> {
                if (!storageInfo.checkStorageReadAndWrite()) {
                  storageInfo.markCorrupted();
                  corruptedDirs.incrementAndGet();
                  return null;
                }

                long total = getTotalSpace(storageInfo.storageDir);
                long availableBytes = getFreeSpace(storageInfo.storageDir);

                totalSpace.addAndGet(total);
                wholeDiskUsedSpace.addAndGet(total - availableBytes);
                long usedBytes = getServiceUsedSpace(storageInfo.storageDir);
                serviceUsedSpace.addAndGet(usedBytes);
                storageInfo.updateServiceUsedBytes(usedBytes);
                storageInfo.updateStorageFreeSpace(availableBytes);

                boolean isWritable = storageInfo.canWrite();
                ShuffleServerMetrics.gaugeLocalStorageIsWritable
                    .labels(storageInfo.storage.getBasePath())
                    .set(isWritable ? 0 : 1);
                ShuffleServerMetrics.gaugeLocalStorageIsTimeout
                    .labels(storageInfo.storage.getBasePath())
                    .set(0);

                if (storageInfo.checkIsSpaceEnough(total, availableBytes)) {
                  num.incrementAndGet();
                }
                return null;
              },
              workers);

      futureMap.put(
          storageInfo,
          CompletableFutureExtension.orTimeout(
              storageCheckFuture, diskCheckerExecutionTimeoutMs.get(), TimeUnit.MILLISECONDS));
    }

    for (Map.Entry<StorageInfo, CompletableFuture<Void>> entry : futureMap.entrySet()) {
      StorageInfo storageInfo = entry.getKey();
      CompletableFuture<Void> f = entry.getValue();

      try {
        f.get();
      } catch (Exception e) {
        if (e instanceof ExecutionException) {
          if (e.getCause() instanceof TimeoutException) {
            LOG.warn(
                "Timeout of checking local storage: {}. The current disk's IO load may be very high.",
                storageInfo.storage.getBasePath());
            ShuffleServerMetrics.gaugeLocalStorageIsTimeout
                .labels(storageInfo.storage.getBasePath())
                .set(1);
            continue;
          }
        }

        throw new RssException(e);
      }
    }

    ShuffleServerMetrics.gaugeLocalStorageTotalSpace.set(totalSpace.get());
    ShuffleServerMetrics.gaugeLocalStorageWholeDiskUsedSpace.set(wholeDiskUsedSpace.get());
    ShuffleServerMetrics.gaugeLocalStorageServiceUsedSpace.set(serviceUsedSpace.get());
    ShuffleServerMetrics.gaugeLocalStorageTotalDirsNum.set(storageInfos.size());
    ShuffleServerMetrics.gaugeLocalStorageCorruptedDirsNum.set(corruptedDirs.get());
    ShuffleServerMetrics.gaugeLocalStorageUsedSpaceRatio.set(
        wholeDiskUsedSpace.get() * 1.0 / totalSpace.get());

    if (storageInfos.isEmpty()) {
      if (isHealthy) {
        LOG.info("shuffle server become unhealthy because of empty storage");
      }
      isHealthy = false;
      return false;
    }

    if (markUnhealthyOnceDirCorrupted && corruptedDirs.get() > 0) {
      if (isHealthy) {
        LOG.info(
            "shuffle server become unhealthy because {} corrupted dirs exist", corruptedDirs.get());
      }
      isHealthy = false;
      return false;
    }

    double availablePercentage = num.get() * 100.0 / storageInfos.size();
    if (Double.compare(availablePercentage, minStorageHealthyPercentage) >= 0) {
      if (!isHealthy) {
        LOG.info("shuffle server become healthy");
      }
      isHealthy = true;
    } else {
      if (isHealthy) {
        LOG.info("shuffle server become unhealthy");
      }
      isHealthy = false;
    }
    return isHealthy;
  }