in s3-artifact-storage-agent/src/main/java/jetbrains/buildServer/artifacts/s3/download/parallel/strategy/impl/AbstractParallelDownloadStrategy.java [94:143]
private void downloadParts(@NotNull String srcUrl,
@NotNull List<FilePart> fileParts,
@NotNull Path targetFile,
long fileSize,
@NotNull ParallelDownloadState downloadState,
@NotNull ParallelDownloadContext downloadContext) throws IOException {
downloadState.expectDownloadedBytes(fileSize);
List<CompletableFuture<Void>> partDownloadFutures = new CopyOnWriteArrayList<>();
fileParts.stream()
.map(filePart -> {
if (downloadState.isInterrupted() || downloadState.hasFailedParts()) return null;
return CompletableFuture.runAsync(() -> {
try {
String partDescription = filePart.getDescription();
LOGGER.debug(String.format("Start downloading part %s", partDescription));
downloadPart(srcUrl, filePart, targetFile, downloadState, downloadContext);
LOGGER.debug(String.format("Part %s downloaded", partDescription));
} catch (Exception e) {
LOGGER.debug(String.format("Failed to download part %s: %s", filePart.getDescription(), e.getMessage()), e);
downloadState.partFailed(filePart, new IOException("Failed to download part " + filePart.getDescription(), e));
partDownloadFutures.forEach(future -> future.cancel(false)); // will cancel unstarted part downloads at the executor level
}
}, downloadContext.getExecutor());
})
.filter(Objects::nonNull) // null when interrupted or detected failure
.forEach(partDownloadFuture -> partDownloadFutures.add(partDownloadFuture));
// wait until part downloads finish and check for uncaught errors and executor level exceptions
CompletableFuture<Void> allPartsDownloadFuture = CompletableFuture.allOf(partDownloadFutures.toArray(new CompletableFuture[]{}));
checkDownloadInterrupted(downloadState);
try {
allPartsDownloadFuture.join();
} catch (CompletionException allPartsDownloadException) {
// rethrow Error from executor or downloadPart method
Throwable somePartException = allPartsDownloadException.getCause();
Throwable somePartExceptionCause = somePartException.getCause();
if (somePartException instanceof Error) throw (Error)somePartException;
if (somePartExceptionCause != null && somePartExceptionCause instanceof Error) throw (Error)somePartExceptionCause;
// exception is from executor
// it might be a cancellation exception if some part failed, throw this part's exception instead
rethrowPartExceptionIfDownloadFailed(downloadState);
// this happens when executor fails to run any of the part tasks (e.g. executor shut down)
throw new IOException("Failed to start all part downloads", somePartException);
}
// check for caught part download exceptions
rethrowPartExceptionIfDownloadFailed(downloadState);
}