in server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java [54:101]
public Storage selectStorage(ShuffleDataFlushEvent event) {
if (getStorages().size() == 1) {
if (event.getUnderStorage() == null) {
event.setUnderStorage(getStorages().get(0));
}
return getStorages().get(0);
}
String appId = event.getAppId();
int shuffleId = event.getShuffleId();
int partitionId = event.getStartPartition();
// TODO(baoloongmao): extend to support select storage by free space
// eventId is a non-negative long.
LocalStorage storage = getStorages().get((int) (event.getEventId() % getStorages().size()));
if (storage != null) {
if (storage.isCorrupted()) {
if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
LOG.error(
"LocalStorage: {} is corrupted. Switching another storage for event: {}, some data will be lost",
storage.getBasePath(),
event);
}
} else {
if (event.getUnderStorage() == null) {
event.setUnderStorage(storage);
}
return storage;
}
}
// TODO(baoloongmao): update health storages and store it as member of this class.
List<LocalStorage> candidates =
getStorages().stream()
.filter(x -> x.canWrite() && !x.isCorrupted())
.collect(Collectors.toList());
if (candidates.size() == 0) {
return null;
}
final LocalStorage selectedStorage =
candidates.get(
ShuffleStorageUtils.getStorageIndex(candidates.size(), appId, shuffleId, partitionId));
if (storage == null || storage.isCorrupted() || event.getUnderStorage() == null) {
event.setUnderStorage(selectedStorage);
return selectedStorage;
}
return storage;
}