public BulkSparkConf()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java [157:232]


    public BulkSparkConf(SparkConf conf, Map<String, String> options)
    {
        this.conf = conf;
        Optional<Integer> sidecarPortFromOptions = MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar port");
        this.userProvidedSidecarPort = sidecarPortFromOptions.isPresent() ? sidecarPortFromOptions.get() : getOptionalInt(SIDECAR_PORT).orElse(-1);
        this.effectiveSidecarPort = this.userProvidedSidecarPort == -1 ? DEFAULT_SIDECAR_PORT : this.userProvidedSidecarPort;
        this.sidecarContactPointsValue = resolveSidecarContactPoints(options);
        this.keyspace = MapUtils.getOrThrow(options, WriterOptions.KEYSPACE.name());
        this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name());
        this.skipExtendedVerify = MapUtils.getBoolean(options, WriterOptions.SKIP_EXTENDED_VERIFY.name(), true,
                                                      "skip extended verification of SSTables by Cassandra");
        this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
        String dc = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null);
        if (!consistencyLevel.isLocal() && dc != null)
        {
            LOGGER.warn("localDc is present for non-local consistency level {} specified in writer options. Correcting localDc to null", consistencyLevel);
            dc = null;
        }
        this.localDC = dc;
        this.numberSplits = MapUtils.getInt(options, WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits");
        this.sstableDataSizeInMiB = resolveSSTableDataSizeInMiB(options);
        this.commitBatchSize = MapUtils.getInt(options, WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit batch size");
        this.commitThreadsPerInstance = MapUtils.getInt(options, WriterOptions.COMMIT_THREADS_PER_INSTANCE.name(), 2, "commit threads per instance");
        this.keystorePassword = MapUtils.getOrDefault(options, WriterOptions.KEYSTORE_PASSWORD.name(), null);
        this.keystorePath = MapUtils.getOrDefault(options, WriterOptions.KEYSTORE_PATH.name(), null);
        this.keystoreBase64Encoded = MapUtils.getOrDefault(options, WriterOptions.KEYSTORE_BASE64_ENCODED.name(), null);
        this.keystoreType = MapUtils.getOrDefault(options, WriterOptions.KEYSTORE_TYPE.name(), "PKCS12");
        this.truststorePassword = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_PASSWORD.name(), null);
        this.truststorePath = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_PATH.name(), null);
        this.truststoreBase64Encoded = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_BASE64_ENCODED.name(), null);
        this.truststoreType = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_TYPE.name(), null);
        this.writeMode = MapUtils.getEnumOption(options, WriterOptions.WRITE_MODE.name(), WriteMode.INSERT, "write mode");
        // For backwards-compatibility with port settings, use writer option if available,
        // else fall back to props, and then default if neither specified
        this.useOpenSsl = getBoolean(USE_OPENSSL, true);
        this.ringRetryCount = getInt(RING_RETRY_COUNT, DEFAULT_RING_RETRY_COUNT);
        this.importCoordinatorTimeoutMultiplier = getDouble(IMPORT_COORDINATOR_TIMEOUT_MULTIPLIER, 0.5);
        this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(), null);
        this.timestamp = MapUtils.getOrDefault(options, WriterOptions.TIMESTAMP.name(), null);
        this.quoteIdentifiers = MapUtils.getBoolean(options, WriterOptions.QUOTE_IDENTIFIERS.name(), false, "quote identifiers");
        int storageClientConcurrency = MapUtils.getInt(options, WriterOptions.STORAGE_CLIENT_CONCURRENCY.name(),
                                                       DEFAULT_STORAGE_CLIENT_CONCURRENCY, "storage client concurrency");
        long storageClientKeepAliveSeconds = MapUtils.getLong(options, WriterOptions.STORAGE_CLIENT_THREAD_KEEP_ALIVE_SECONDS.name(),
                                                              DEFAULT_STORAGE_CLIENT_KEEP_ALIVE_SECONDS);
        int storageClientMaxChunkSizeInBytes = MapUtils.getInt(options, WriterOptions.STORAGE_CLIENT_MAX_CHUNK_SIZE_IN_BYTES.name(),
                                                               DEFAULT_STORAGE_CLIENT_MAX_CHUNK_SIZE_IN_BYTES);
        String storageClientHttpsProxy = MapUtils.getOrDefault(options, WriterOptions.STORAGE_CLIENT_HTTPS_PROXY.name(), null);
        String storageClientEndpointOverride = MapUtils.getOrDefault(options, WriterOptions.STORAGE_CLIENT_ENDPOINT_OVERRIDE.name(), null);
        long nioHttpClientConnectionAcquisitionTimeoutSeconds =
        MapUtils.getLong(options, WriterOptions.STORAGE_CLIENT_NIO_HTTP_CLIENT_CONNECTION_ACQUISITION_TIMEOUT_SECONDS.name(), 300);
        int nioHttpClientMaxConcurrency = MapUtils.getInt(options, WriterOptions.STORAGE_CLIENT_NIO_HTTP_CLIENT_MAX_CONCURRENCY.name(), 50);
        this.storageClientConfig = new StorageClientConfig(storageClientConcurrency,
                                                           storageClientKeepAliveSeconds,
                                                           storageClientMaxChunkSizeInBytes,
                                                           storageClientHttpsProxy,
                                                           storageClientEndpointOverride,
                                                           nioHttpClientConnectionAcquisitionTimeoutSeconds,
                                                           nioHttpClientMaxConcurrency);
        DataTransport dataTransport = MapUtils.getEnumOption(options, WriterOptions.DATA_TRANSPORT.name(), DataTransport.DIRECT, "Data Transport");
        long maxSizePerSSTableBundleInBytesS3Transport = MapUtils.getLong(options, WriterOptions.MAX_SIZE_PER_SSTABLE_BUNDLE_IN_BYTES_S3_TRANSPORT.name(),
                                                                          DEFAULT_MAX_SIZE_PER_SSTABLE_BUNDLE_IN_BYTES_S3_TRANSPORT);
        String transportExtensionClass = MapUtils.getOrDefault(options, WriterOptions.DATA_TRANSPORT_EXTENSION_CLASS.name(), null);
        this.dataTransportInfo = new DataTransportInfo(dataTransport, transportExtensionClass, maxSizePerSSTableBundleInBytesS3Transport);
        this.jobKeepAliveMinutes = MapUtils.getInt(options, WriterOptions.JOB_KEEP_ALIVE_MINUTES.name(), MINIMUM_JOB_KEEP_ALIVE_MINUTES);
        if (this.jobKeepAliveMinutes < MINIMUM_JOB_KEEP_ALIVE_MINUTES)
        {
            throw new IllegalArgumentException(String.format("Invalid value for the '%s' Bulk Writer option (%d). It cannot be less than the minimum %s",
                                                             WriterOptions.JOB_KEEP_ALIVE_MINUTES, jobKeepAliveMinutes, MINIMUM_JOB_KEEP_ALIVE_MINUTES));
        }
        this.jobTimeoutSeconds = MapUtils.getLong(options, WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L);
        this.configuredJobId = MapUtils.getOrDefault(options, WriterOptions.JOB_ID.name(), null);
        this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, WriterOptions.COORDINATED_WRITE_CONFIG.name(), null);
        this.coordinatedWriteConf = buildCoordinatedWriteConf(dataTransportInfo.getTransport());
        this.digestAlgorithmSupplier = digestAlgorithmSupplierFromOptions(dataTransport, options);
        validateEnvironment();
    }