in storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java [83:137]
public ShuffleDataResult readShuffleData() {
if (!prefetchEnabled) {
return doReadShuffleData();
}
int free = prefetchQueueCapacity - prefetchResultQueue.size() - queueingNumber.get();
for (int i = 0; i < free; i++) {
queueingNumber.incrementAndGet();
prefetchExecutors.submit(
() -> {
long start = System.currentTimeMillis();
try {
if (abnormalFetchTag.get() || finishedTag.get()) {
return;
}
ShuffleDataResult result = doReadShuffleData();
if (result == null) {
this.finishedTag.set(true);
}
prefetchResultQueue.offer(Optional.ofNullable(result));
} catch (Exception e) {
abnormalFetchTag.set(true);
LOG.error("Errors on doing readShuffleData", e);
} finally {
queueingNumber.decrementAndGet();
fetchTime.addAndGet(System.currentTimeMillis() - start);
}
});
}
long start = System.currentTimeMillis();
while (true) {
if (abnormalFetchTag.get()) {
throw new RssException("Fast fail due to the fetch failure");
}
try {
Optional<ShuffleDataResult> optionalShuffleDataResult =
prefetchResultQueue.poll(10, TimeUnit.MILLISECONDS);
if (optionalShuffleDataResult != null) {
if (optionalShuffleDataResult.isPresent()) {
return optionalShuffleDataResult.get();
} else {
return null;
}
}
} catch (InterruptedException e) {
return null;
}
if (System.currentTimeMillis() - start > prefetchTimeoutSec * 1000) {
throw new RssException("Unexpected duration of reading shuffle data. Fast fail!");
}
}
}