public Storage selectStorage()

in server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java [179:226]


  public Storage selectStorage(ShuffleDataFlushEvent event) {
    String appId = event.getAppId();
    int shuffleId = event.getShuffleId();
    int partitionId = event.getStartPartition();

    LocalStorage storage =
        partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, partitionId));
    if (storage != null) {
      if (storage.isCorrupted()) {
        if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
          LOG.error(
              "LocalStorage: {} is corrupted. Switching another storage for event: {}, some data will be lost",
              storage.getBasePath(),
              event);
        }
      } else {
        if (event.getUnderStorage() == null) {
          event.setUnderStorage(storage);
        }
        return storage;
      }
    }

    List<LocalStorage> candidates =
        localStorages.stream()
            .filter(x -> x.canWrite() && !x.isCorrupted())
            .collect(Collectors.toList());

    if (candidates.size() == 0) {
      return null;
    }
    final LocalStorage selectedStorage =
        candidates.get(
            ShuffleStorageUtils.getStorageIndex(candidates.size(), appId, shuffleId, partitionId));
    return partitionsOfStorage.compute(
        UnionKey.buildKey(appId, shuffleId, partitionId),
        (key, localStorage) -> {
          // If this is the first time to select storage or existing storage is corrupted,
          // we should refresh the cache.
          if (localStorage == null
              || localStorage.isCorrupted()
              || event.getUnderStorage() == null) {
            event.setUnderStorage(selectedStorage);
            return selectedStorage;
          }
          return localStorage;
        });
  }