in teamcity-s3-sdk/src/main/java/jetbrains/buildServer/artifacts/s3/publish/presigned/upload/S3SignedUrlFileUploader.java [52:138]
public Collection<UploadStatistics> upload(@NotNull Map<File, String> filesToUpload,
@NotNull Supplier<String> interrupter,
Consumer<FileUploadInfo> uploadInfoConsumer) {
LOGGER.debug(() -> "Publishing artifacts using S3 configuration " + myS3Configuration);
final boolean consistencyCheckEnabled = myS3Configuration.getAdvancedConfiguration().isConsistencyCheckEnabled();
final Map<String, String> precalculatedDigests = new HashMap<>();
final Map<String, FileWithArtifactPath> normalizedObjectPaths = new HashMap<>();
for (Map.Entry<File, String> entry : filesToUpload.entrySet()) {
final File file = entry.getKey();
final String artifactPath = S3Util.normalizeArtifactPath(entry.getValue(), file);
final String objectKey = myS3Configuration.getPathPrefix() + artifactPath;
final FileWithArtifactPath existingMapping = normalizedObjectPaths.get(objectKey);
if (existingMapping != null && !existingMapping.getFile().equals(file)) {
myLogger.warn("Found clashing artifacts path: " + artifactPath + " leading to different files [" + existingMapping.getFile().getPath() + "," + file.getPath() + "].\n" +
"Only the last file will be uploaded to the specified artifact path.");
}
try {
if (consistencyCheckEnabled && !precalculatedDigests.containsKey(objectKey) && !isMultipartUpload(file)) {
precalculatedDigests.put(objectKey, getDigest(file));
}
} catch (IOException e) {
LOGGER.warnAndDebugDetails("Failed to calculate digest for " + file, e);
}
normalizedObjectPaths.put(objectKey, FileWithArtifactPath.create(artifactPath, file));
}
final StatisticsLogger statisticsLogger = new StatisticsLogger();
final Retrier retrier = defaultAwsRetrier(myS3Configuration.getAdvancedConfiguration().getRetriesNum(), myS3Configuration.getAdvancedConfiguration().getRetryDelay(), LOGGER);
try (final CloseableS3SignedUrlUploadPool multipartUploadPool = new CloseableS3SignedUrlUploadPool(myS3Configuration.getAdvancedConfiguration().getNThreads());
final LowLevelS3Client lowLevelS3Client = createAwsClient(myS3Configuration)) {
final S3SignedUploadManager uploadManager = new S3SignedUploadManager(myPresignedUrlsProviderClient.get(),
myS3Configuration.getAdvancedConfiguration(),
normalizedObjectPaths.keySet(),
precalculatedDigests);
LOGGER.debug("Publishing [" + filesToUpload.keySet().stream().map(File::getName).collect(Collectors.joining(",")) + "] to S3");
normalizedObjectPaths.entrySet()
.stream()
.map(objectKeyToFileWithArtifactPath -> {
try {
return multipartUploadPool.submit(() -> retrier
.execute(createUpload(interrupter, statisticsLogger, lowLevelS3Client, uploadManager, objectKeyToFileWithArtifactPath)));
} catch (RejectedExecutionException e) {
if (isPoolTerminating(multipartUploadPool)) {
LOGGER.debug("Artifact publishing rejected by pool shutdown");
} else {
LOGGER.warnAndDebugDetails("Artifact publishing rejected by pool", e);
}
return null;
}
})
.filter(Objects::nonNull)
.map(future -> waitForCompletion(future, e -> {
logPublishingError(e);
if (isPublishingInterruptedException(e)) {
shutdownPool(multipartUploadPool);
} else {
ExceptionUtil.rethrowAsRuntimeException(e);
}
}))
.filter(Objects::nonNull)
.forEach(uploadInfo -> {
try {
uploadInfoConsumer.accept(uploadInfo);
} catch (Throwable t) {
LOGGER.warnAndDebugDetails("Failed to send artifact upload information to consumer", t);
}
});
} catch (Throwable th) {
if (isPublishingInterruptedException(th)) {
LOGGER.info("Publishing is interrupted " + th.getMessage(), th);
} else {
if (th instanceof FileUploadFailedException) {
throw th;
}
LOGGER.warnAndDebugDetails("Got error while uploading artifacts " + th.getMessage(), th);
throw new FileUploadFailedException(th.getMessage(), false, th);
}
}
return statisticsLogger.getAllRecords();
}