public ShuffleDataResult readShuffleData()

in storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java [83:137]


  public ShuffleDataResult readShuffleData() {
    if (!prefetchEnabled) {
      return doReadShuffleData();
    }

    int free = prefetchQueueCapacity - prefetchResultQueue.size() - queueingNumber.get();
    for (int i = 0; i < free; i++) {
      queueingNumber.incrementAndGet();
      prefetchExecutors.submit(
          () -> {
            long start = System.currentTimeMillis();
            try {
              if (abnormalFetchTag.get() || finishedTag.get()) {
                return;
              }
              ShuffleDataResult result = doReadShuffleData();
              if (result == null) {
                this.finishedTag.set(true);
              }
              prefetchResultQueue.offer(Optional.ofNullable(result));
            } catch (Exception e) {
              abnormalFetchTag.set(true);
              LOG.error("Errors on doing readShuffleData", e);
            } finally {
              queueingNumber.decrementAndGet();
              fetchTime.addAndGet(System.currentTimeMillis() - start);
            }
          });
    }

    long start = System.currentTimeMillis();
    while (true) {
      if (abnormalFetchTag.get()) {
        throw new RssException("Fast fail due to the fetch failure");
      }

      try {
        Optional<ShuffleDataResult> optionalShuffleDataResult =
            prefetchResultQueue.poll(10, TimeUnit.MILLISECONDS);
        if (optionalShuffleDataResult != null) {
          if (optionalShuffleDataResult.isPresent()) {
            return optionalShuffleDataResult.get();
          } else {
            return null;
          }
        }
      } catch (InterruptedException e) {
        return null;
      }

      if (System.currentTimeMillis() - start > prefetchTimeoutSec * 1000) {
        throw new RssException("Unexpected duration of reading shuffle data. Fast fail!");
      }
    }
  }