in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClient.java [82:136]
public StorageClient(StorageTransportConfiguration storageTransportConfiguration,
StorageClientConfig storageClientConfig)
{
this.storageTransportConfiguration = storageTransportConfiguration;
ThreadPoolExecutor executor = new ThreadPoolExecutor(storageClientConfig.concurrency, // core
storageClientConfig.concurrency, // max
// keep alive
storageClientConfig.threadKeepAliveSeconds, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), // unbounded work queue
new ThreadFactoryBuilder().threadNamePrefix(storageClientConfig.threadNamePrefix)
.daemonThreads(true)
.build());
// Must set it to allow threads to time out, so that it can release resources when idle.
executor.allowCoreThreadTimeOut(true);
Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions = Collections.singletonMap(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, executor
);
String region = storageTransportConfiguration.writeAccessConfiguration().region();
S3AsyncClientBuilder clientBuilder = S3AsyncClient.builder()
.region(Region.of(region))
.asyncConfiguration(b -> b.advancedOptions(advancedOptions));
if (storageClientConfig.endpointOverride != null)
{
clientBuilder.endpointOverride(storageClientConfig.endpointOverride)
.forcePathStyle(true);
}
if (storageClientConfig.httpsProxy != null)
{
ProxyConfiguration proxyConfig = ProxyConfiguration.builder()
.host(storageClientConfig.httpsProxy.getHost())
.port(storageClientConfig.httpsProxy.getPort())
.scheme(storageClientConfig.httpsProxy.getScheme())
.build();
Duration connectionAcquisitionTimeout = Duration.ofSeconds(storageClientConfig.nioHttpClientConnectionAcquisitionTimeoutSeconds);
clientBuilder.httpClient(NettyNioAsyncHttpClient.builder()
.proxyConfiguration(proxyConfig)
.connectionAcquisitionTimeout(connectionAcquisitionTimeout)
.maxConcurrency(storageClientConfig.nioHttpClientMaxConcurrency)
.build());
}
this.client = clientBuilder.build();
this.dataChunker = new DataChunker(storageClientConfig.maxChunkSizeInBytes);
List<Tag> tags = this.storageTransportConfiguration.getObjectTags()
.entrySet()
.stream()
.map(entry -> Tag.builder()
.key(entry.getKey())
.value(entry.getValue())
.build())
.collect(Collectors.toList());
this.tagging = Tagging.builder().tagSet(tags).build();
this.credentialsCache = new ConcurrentHashMap<>();
}