private void downloadParts()

in s3-artifact-storage-agent/src/main/java/jetbrains/buildServer/artifacts/s3/download/parallel/strategy/impl/AbstractParallelDownloadStrategy.java [94:143]


  private void downloadParts(@NotNull String srcUrl,
                             @NotNull List<FilePart> fileParts,
                             @NotNull Path targetFile,
                             long fileSize,
                             @NotNull ParallelDownloadState downloadState,
                             @NotNull ParallelDownloadContext downloadContext) throws IOException {
    downloadState.expectDownloadedBytes(fileSize);
    List<CompletableFuture<Void>> partDownloadFutures = new CopyOnWriteArrayList<>();
    fileParts.stream()
      .map(filePart -> {
        if (downloadState.isInterrupted() || downloadState.hasFailedParts()) return null;
        return CompletableFuture.runAsync(() -> {
          try {
            String partDescription = filePart.getDescription();
            LOGGER.debug(String.format("Start downloading part %s", partDescription));
            downloadPart(srcUrl, filePart, targetFile, downloadState, downloadContext);
            LOGGER.debug(String.format("Part %s downloaded", partDescription));
          } catch (Exception e) {
            LOGGER.debug(String.format("Failed to download part %s: %s", filePart.getDescription(), e.getMessage()), e);
            downloadState.partFailed(filePart, new IOException("Failed to download part " + filePart.getDescription(), e));
            partDownloadFutures.forEach(future -> future.cancel(false)); // will cancel unstarted part downloads at the executor level
          }
        }, downloadContext.getExecutor());
      })
      .filter(Objects::nonNull) // null when interrupted or detected failure
      .forEach(partDownloadFuture -> partDownloadFutures.add(partDownloadFuture));

    // wait until part downloads finish and check for uncaught errors and executor level exceptions
    CompletableFuture<Void> allPartsDownloadFuture = CompletableFuture.allOf(partDownloadFutures.toArray(new CompletableFuture[]{}));
    checkDownloadInterrupted(downloadState);
    try {
      allPartsDownloadFuture.join();
    } catch (CompletionException allPartsDownloadException) {
      // rethrow Error from executor or downloadPart method
      Throwable somePartException = allPartsDownloadException.getCause();
      Throwable somePartExceptionCause = somePartException.getCause();
      if (somePartException instanceof Error) throw (Error)somePartException;
      if (somePartExceptionCause != null && somePartExceptionCause instanceof Error) throw (Error)somePartExceptionCause;

      // exception is from executor
      // it might be a cancellation exception if some part failed, throw this part's exception instead
      rethrowPartExceptionIfDownloadFailed(downloadState);

      // this happens when executor fails to run any of the part tasks (e.g. executor shut down)
      throw new IOException("Failed to start all part downloads", somePartException);
    }

    // check for caught part download exceptions
    rethrowPartExceptionIfDownloadFailed(downloadState);
  }