public Collection upload()

in teamcity-s3-sdk/src/main/java/jetbrains/buildServer/artifacts/s3/publish/presigned/upload/S3SignedUrlFileUploader.java [52:138]


  public Collection<UploadStatistics> upload(@NotNull Map<File, String> filesToUpload,
                                             @NotNull Supplier<String> interrupter,
                                             Consumer<FileUploadInfo> uploadInfoConsumer) {
    LOGGER.debug(() -> "Publishing artifacts using S3 configuration " + myS3Configuration);

    final boolean consistencyCheckEnabled = myS3Configuration.getAdvancedConfiguration().isConsistencyCheckEnabled();
    final Map<String, String> precalculatedDigests = new HashMap<>();

    final Map<String, FileWithArtifactPath> normalizedObjectPaths = new HashMap<>();
    for (Map.Entry<File, String> entry : filesToUpload.entrySet()) {
      final File file = entry.getKey();
      final String artifactPath = S3Util.normalizeArtifactPath(entry.getValue(), file);
      final String objectKey = myS3Configuration.getPathPrefix() + artifactPath;

      final FileWithArtifactPath existingMapping = normalizedObjectPaths.get(objectKey);
      if (existingMapping != null && !existingMapping.getFile().equals(file)) {
        myLogger.warn("Found clashing artifacts path: " + artifactPath + " leading to different files [" + existingMapping.getFile().getPath() + "," + file.getPath() + "].\n" +
                      "Only the last file will be uploaded to the specified artifact path.");
      }
      try {
        if (consistencyCheckEnabled && !precalculatedDigests.containsKey(objectKey) && !isMultipartUpload(file)) {
          precalculatedDigests.put(objectKey, getDigest(file));
        }
      } catch (IOException e) {
        LOGGER.warnAndDebugDetails("Failed to calculate digest for " + file, e);
      }
      normalizedObjectPaths.put(objectKey, FileWithArtifactPath.create(artifactPath, file));
    }

    final StatisticsLogger statisticsLogger = new StatisticsLogger();

    final Retrier retrier = defaultAwsRetrier(myS3Configuration.getAdvancedConfiguration().getRetriesNum(), myS3Configuration.getAdvancedConfiguration().getRetryDelay(), LOGGER);

    try (final CloseableS3SignedUrlUploadPool multipartUploadPool = new CloseableS3SignedUrlUploadPool(myS3Configuration.getAdvancedConfiguration().getNThreads());
         final LowLevelS3Client lowLevelS3Client = createAwsClient(myS3Configuration)) {
      final S3SignedUploadManager uploadManager = new S3SignedUploadManager(myPresignedUrlsProviderClient.get(),
                                                                            myS3Configuration.getAdvancedConfiguration(),
                                                                            normalizedObjectPaths.keySet(),
                                                                            precalculatedDigests);


      LOGGER.debug("Publishing [" + filesToUpload.keySet().stream().map(File::getName).collect(Collectors.joining(",")) + "] to S3");
      normalizedObjectPaths.entrySet()
                           .stream()
                           .map(objectKeyToFileWithArtifactPath -> {
                             try {
                               return multipartUploadPool.submit(() -> retrier
                                 .execute(createUpload(interrupter, statisticsLogger, lowLevelS3Client, uploadManager, objectKeyToFileWithArtifactPath)));
                             } catch (RejectedExecutionException e) {
                               if (isPoolTerminating(multipartUploadPool)) {
                                 LOGGER.debug("Artifact publishing rejected by pool shutdown");
                               } else {
                                 LOGGER.warnAndDebugDetails("Artifact publishing rejected by pool", e);
                               }
                               return null;
                             }
                           })
                           .filter(Objects::nonNull)
                           .map(future -> waitForCompletion(future, e -> {
                             logPublishingError(e);
                             if (isPublishingInterruptedException(e)) {
                               shutdownPool(multipartUploadPool);
                             } else {
                               ExceptionUtil.rethrowAsRuntimeException(e);
                             }
                           }))
                           .filter(Objects::nonNull)
                           .forEach(uploadInfo -> {
                             try {
                               uploadInfoConsumer.accept(uploadInfo);
                             } catch (Throwable t) {
                               LOGGER.warnAndDebugDetails("Failed to send artifact upload information to consumer", t);
                             }
                           });
    } catch (Throwable th) {
      if (isPublishingInterruptedException(th)) {
        LOGGER.info("Publishing is interrupted " + th.getMessage(), th);
      } else {
        if (th instanceof FileUploadFailedException) {
          throw th;
        }
        LOGGER.warnAndDebugDetails("Got error while uploading artifacts " + th.getMessage(), th);
        throw new FileUploadFailedException(th.getMessage(), false, th);
      }
    }
    return statisticsLogger.getAllRecords();
  }