in aws-core-common/src/main/java/jetbrains/buildServer/util/amazon/S3Util.java [47:106]
public static <T extends Transfer> Collection<CompletedTransfer> withTransferManager(@NotNull final S3AsyncClient s3Client,
@NotNull final WithTransferManager<T> runnable,
@NotNull final S3AdvancedConfiguration advancedConfiguration) throws Throwable {
ExecutorService defaultExecutorService = createDefaultExecutorService(advancedConfiguration.getNThreads());
final S3TransferManager manager = S3TransferManager.builder()
.s3Client(s3Client)
.executor(defaultExecutorService)
.build();
LOG.debug(() -> "Processing with s3Client " + advancedConfiguration);
try {
final List<T> transfers = new ArrayList<>(runnable.run(manager));
final AtomicBoolean isInterrupted = new AtomicBoolean(false);
if (runnable instanceof InterruptAwareWithTransferManager) {
final TransferManagerInterruptHook hook = () -> {
isInterrupted.set(true);
for (T transfer : transfers) {
if (transfer != null) {
transfer.completionFuture().cancel(true);
}
}
};
((InterruptAwareWithTransferManager<T>)runnable).setInterruptHook(hook);
}
Collection<CompletedTransfer> completedTransfers = new ArrayList<>();
Throwable exception = null;
for (T transfer : transfers) {
try {
if (transfer != null) {
completedTransfers.add(transfer.completionFuture().join());
}
} catch (Throwable t) {
if (!isInterrupted.get()) {
if (exception != null) {
exception.addSuppressed(t);
} else {
exception = t;
}
}
}
}
if (exception != null) {
throw exception;
}
return completedTransfers;
} finally {
defaultExecutorService.shutdownNow();
if (advancedConfiguration.shouldShutdownClient()) {
shutdownClient(s3Client);
}
}
}