public StorageClient()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClient.java [82:136]


    public StorageClient(StorageTransportConfiguration storageTransportConfiguration,
                         StorageClientConfig storageClientConfig)
    {
        this.storageTransportConfiguration = storageTransportConfiguration;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(storageClientConfig.concurrency, // core
                                                             storageClientConfig.concurrency, // max
                                                             // keep alive
                                                             storageClientConfig.threadKeepAliveSeconds, TimeUnit.SECONDS,
                                                             new LinkedBlockingQueue<>(), // unbounded work queue
                                                             new ThreadFactoryBuilder().threadNamePrefix(storageClientConfig.threadNamePrefix)
                                                                                       .daemonThreads(true)
                                                                                       .build());
        // Must set it to allow threads to time out, so that it can release resources when idle.
        executor.allowCoreThreadTimeOut(true);
        Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions = Collections.singletonMap(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, executor
        );


        String region = storageTransportConfiguration.writeAccessConfiguration().region();
        S3AsyncClientBuilder clientBuilder = S3AsyncClient.builder()
                                                          .region(Region.of(region))
                                                          .asyncConfiguration(b -> b.advancedOptions(advancedOptions));
        if (storageClientConfig.endpointOverride != null)
        {
            clientBuilder.endpointOverride(storageClientConfig.endpointOverride)
                         .forcePathStyle(true);
        }
        if (storageClientConfig.httpsProxy != null)
        {
            ProxyConfiguration proxyConfig = ProxyConfiguration.builder()
                                                               .host(storageClientConfig.httpsProxy.getHost())
                                                               .port(storageClientConfig.httpsProxy.getPort())
                                                               .scheme(storageClientConfig.httpsProxy.getScheme())
                                                               .build();
            Duration connectionAcquisitionTimeout = Duration.ofSeconds(storageClientConfig.nioHttpClientConnectionAcquisitionTimeoutSeconds);
            clientBuilder.httpClient(NettyNioAsyncHttpClient.builder()
                                                            .proxyConfiguration(proxyConfig)
                                                            .connectionAcquisitionTimeout(connectionAcquisitionTimeout)
                                                            .maxConcurrency(storageClientConfig.nioHttpClientMaxConcurrency)
                                                            .build());
        }
        this.client = clientBuilder.build();
        this.dataChunker = new DataChunker(storageClientConfig.maxChunkSizeInBytes);
        List<Tag> tags = this.storageTransportConfiguration.getObjectTags()
                                                           .entrySet()
                                                           .stream()
                                                           .map(entry -> Tag.builder()
                                                                            .key(entry.getKey())
                                                                            .value(entry.getValue())
                                                                            .build())
                                                           .collect(Collectors.toList());
        this.tagging = Tagging.builder().tagSet(tags).build();
        this.credentialsCache = new ConcurrentHashMap<>();
    }