public Storage selectStorage()

in server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java [54:101]


  public Storage selectStorage(ShuffleDataFlushEvent event) {
    if (getStorages().size() == 1) {
      if (event.getUnderStorage() == null) {
        event.setUnderStorage(getStorages().get(0));
      }
      return getStorages().get(0);
    }
    String appId = event.getAppId();
    int shuffleId = event.getShuffleId();
    int partitionId = event.getStartPartition();

    // TODO(baoloongmao): extend to support select storage by free space
    // eventId is a non-negative long.
    LocalStorage storage = getStorages().get((int) (event.getEventId() % getStorages().size()));
    if (storage != null) {
      if (storage.isCorrupted()) {
        if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
          LOG.error(
              "LocalStorage: {} is corrupted. Switching another storage for event: {}, some data will be lost",
              storage.getBasePath(),
              event);
        }
      } else {
        if (event.getUnderStorage() == null) {
          event.setUnderStorage(storage);
        }
        return storage;
      }
    }

    // TODO(baoloongmao): update health storages and store it as member of this class.
    List<LocalStorage> candidates =
        getStorages().stream()
            .filter(x -> x.canWrite() && !x.isCorrupted())
            .collect(Collectors.toList());

    if (candidates.size() == 0) {
      return null;
    }
    final LocalStorage selectedStorage =
        candidates.get(
            ShuffleStorageUtils.getStorageIndex(candidates.size(), appId, shuffleId, partitionId));
    if (storage == null || storage.isCorrupted() || event.getUnderStorage() == null) {
      event.setUnderStorage(selectedStorage);
      return selectedStorage;
    }
    return storage;
  }