in server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java [97:194]
public LocalStorageManager(ShuffleServerConf conf) {
super(conf);
storageBasePaths = RssUtils.getConfiguredLocalDirs(conf);
if (CollectionUtils.isEmpty(storageBasePaths)) {
throw new IllegalArgumentException("Base path dirs must not be empty");
}
this.sortedPartitionsOfStorageMap = new ConcurrentSkipListMap<>();
long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO);
double highWaterMarkOfWrite = conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
double lowWaterMarkOfWrite = conf.get(ShuffleServerConf.LOW_WATER_MARK_OF_WRITE);
if (highWaterMarkOfWrite < lowWaterMarkOfWrite) {
throw new IllegalArgumentException(
"highWaterMarkOfWrite must be larger than lowWaterMarkOfWrite");
}
// We must make sure the order of `storageBasePaths` and `localStorages` is same, or some unit
// test may be fail
CountDownLatch countDownLatch = new CountDownLatch(storageBasePaths.size());
AtomicInteger successCount = new AtomicInteger();
ServiceLoader<StorageMediaProvider> loader = ServiceLoader.load(StorageMediaProvider.class);
for (StorageMediaProvider provider : loader) {
provider.init(conf);
typeProviders.add(provider);
}
ExecutorService executorService = ThreadUtils.getDaemonCachedThreadPool("LocalStorage-check");
LocalStorage[] localStorageArray = new LocalStorage[storageBasePaths.size()];
boolean isDiskCapacityWatermarkCheckEnabled = conf.get(DISK_CAPACITY_WATERMARK_CHECK_ENABLED);
for (int i = 0; i < storageBasePaths.size(); i++) {
final int idx = i;
String storagePath = storageBasePaths.get(i);
executorService.submit(
() -> {
try {
StorageMedia storageType = getStorageTypeForBasePath(storagePath);
LocalStorage.Builder builder =
LocalStorage.newBuilder()
.basePath(storagePath)
.capacity(capacity)
.ratio(ratio)
.lowWaterMarkOfWrite(lowWaterMarkOfWrite)
.highWaterMarkOfWrite(highWaterMarkOfWrite)
.setId(idx)
.localStorageMedia(storageType);
if (isDiskCapacityWatermarkCheckEnabled) {
builder.enableDiskCapacityWatermarkCheck();
}
localStorageArray[idx] = builder.build();
successCount.incrementAndGet();
} catch (Exception e) {
LOG.error("LocalStorage init failed!", e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
LOG.error("Failed to wait initializing local storage.", e);
}
executorService.shutdown();
int failedCount = storageBasePaths.size() - successCount.get();
long maxFailedNumber = conf.getLong(LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER);
if (failedCount > maxFailedNumber || successCount.get() == 0) {
throw new RssException(
String.format(
"Initialize %s local storage(s) failed, "
+ "specified local storage paths size: %s, the conf of %s size: %s",
failedCount,
localStorageArray.length,
LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER.key(),
maxFailedNumber));
}
localStorages =
Arrays.stream(localStorageArray).filter(Objects::nonNull).collect(Collectors.toList());
LOG.info(
"Succeed to initialize storage paths: {}",
StringUtils.join(
localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())));
this.checker = new LocalStorageChecker(conf, localStorages);
isStorageAuditLogEnabled =
conf.getReconfigurableConf(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED).get();
ReconfigurableRegistry.register(
ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.toString(),
(rssConf, changedProperties) -> {
if (changedProperties == null || rssConf == null) {
return;
}
if (changedProperties.contains(
ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.key())) {
isStorageAuditLogEnabled =
rssConf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
}
});
}