public LocalStorageManager()

in server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java [97:194]


  public LocalStorageManager(ShuffleServerConf conf) {
    super(conf);
    storageBasePaths = RssUtils.getConfiguredLocalDirs(conf);
    if (CollectionUtils.isEmpty(storageBasePaths)) {
      throw new IllegalArgumentException("Base path dirs must not be empty");
    }
    this.sortedPartitionsOfStorageMap = new ConcurrentSkipListMap<>();
    long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
    double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO);
    double highWaterMarkOfWrite = conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
    double lowWaterMarkOfWrite = conf.get(ShuffleServerConf.LOW_WATER_MARK_OF_WRITE);
    if (highWaterMarkOfWrite < lowWaterMarkOfWrite) {
      throw new IllegalArgumentException(
          "highWaterMarkOfWrite must be larger than lowWaterMarkOfWrite");
    }

    // We must make sure the order of `storageBasePaths` and `localStorages` is same, or some unit
    // test may be fail
    CountDownLatch countDownLatch = new CountDownLatch(storageBasePaths.size());
    AtomicInteger successCount = new AtomicInteger();
    ServiceLoader<StorageMediaProvider> loader = ServiceLoader.load(StorageMediaProvider.class);
    for (StorageMediaProvider provider : loader) {
      provider.init(conf);
      typeProviders.add(provider);
    }
    ExecutorService executorService = ThreadUtils.getDaemonCachedThreadPool("LocalStorage-check");
    LocalStorage[] localStorageArray = new LocalStorage[storageBasePaths.size()];
    boolean isDiskCapacityWatermarkCheckEnabled = conf.get(DISK_CAPACITY_WATERMARK_CHECK_ENABLED);
    for (int i = 0; i < storageBasePaths.size(); i++) {
      final int idx = i;
      String storagePath = storageBasePaths.get(i);
      executorService.submit(
          () -> {
            try {
              StorageMedia storageType = getStorageTypeForBasePath(storagePath);
              LocalStorage.Builder builder =
                  LocalStorage.newBuilder()
                      .basePath(storagePath)
                      .capacity(capacity)
                      .ratio(ratio)
                      .lowWaterMarkOfWrite(lowWaterMarkOfWrite)
                      .highWaterMarkOfWrite(highWaterMarkOfWrite)
                      .setId(idx)
                      .localStorageMedia(storageType);
              if (isDiskCapacityWatermarkCheckEnabled) {
                builder.enableDiskCapacityWatermarkCheck();
              }
              localStorageArray[idx] = builder.build();
              successCount.incrementAndGet();
            } catch (Exception e) {
              LOG.error("LocalStorage init failed!", e);
            } finally {
              countDownLatch.countDown();
            }
          });
    }

    try {
      countDownLatch.await();
    } catch (InterruptedException e) {
      LOG.error("Failed to wait initializing local storage.", e);
    }
    executorService.shutdown();

    int failedCount = storageBasePaths.size() - successCount.get();
    long maxFailedNumber = conf.getLong(LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER);
    if (failedCount > maxFailedNumber || successCount.get() == 0) {
      throw new RssException(
          String.format(
              "Initialize %s local storage(s) failed, "
                  + "specified local storage paths size: %s, the conf of %s size: %s",
              failedCount,
              localStorageArray.length,
              LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER.key(),
              maxFailedNumber));
    }
    localStorages =
        Arrays.stream(localStorageArray).filter(Objects::nonNull).collect(Collectors.toList());
    LOG.info(
        "Succeed to initialize storage paths: {}",
        StringUtils.join(
            localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())));
    this.checker = new LocalStorageChecker(conf, localStorages);
    isStorageAuditLogEnabled =
        conf.getReconfigurableConf(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED).get();
    ReconfigurableRegistry.register(
        ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.toString(),
        (rssConf, changedProperties) -> {
          if (changedProperties == null || rssConf == null) {
            return;
          }
          if (changedProperties.contains(
              ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.key())) {
            isStorageAuditLogEnabled =
                rssConf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
          }
        });
  }