in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java [157:232]
public BulkSparkConf(SparkConf conf, Map<String, String> options)
{
this.conf = conf;
Optional<Integer> sidecarPortFromOptions = MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar port");
this.userProvidedSidecarPort = sidecarPortFromOptions.isPresent() ? sidecarPortFromOptions.get() : getOptionalInt(SIDECAR_PORT).orElse(-1);
this.effectiveSidecarPort = this.userProvidedSidecarPort == -1 ? DEFAULT_SIDECAR_PORT : this.userProvidedSidecarPort;
this.sidecarContactPointsValue = resolveSidecarContactPoints(options);
this.keyspace = MapUtils.getOrThrow(options, WriterOptions.KEYSPACE.name());
this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name());
this.skipExtendedVerify = MapUtils.getBoolean(options, WriterOptions.SKIP_EXTENDED_VERIFY.name(), true,
"skip extended verification of SSTables by Cassandra");
this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
String dc = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null);
if (!consistencyLevel.isLocal() && dc != null)
{
LOGGER.warn("localDc is present for non-local consistency level {} specified in writer options. Correcting localDc to null", consistencyLevel);
dc = null;
}
this.localDC = dc;
this.numberSplits = MapUtils.getInt(options, WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits");
this.sstableDataSizeInMiB = resolveSSTableDataSizeInMiB(options);
this.commitBatchSize = MapUtils.getInt(options, WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit batch size");
this.commitThreadsPerInstance = MapUtils.getInt(options, WriterOptions.COMMIT_THREADS_PER_INSTANCE.name(), 2, "commit threads per instance");
this.keystorePassword = MapUtils.getOrDefault(options, WriterOptions.KEYSTORE_PASSWORD.name(), null);
this.keystorePath = MapUtils.getOrDefault(options, WriterOptions.KEYSTORE_PATH.name(), null);
this.keystoreBase64Encoded = MapUtils.getOrDefault(options, WriterOptions.KEYSTORE_BASE64_ENCODED.name(), null);
this.keystoreType = MapUtils.getOrDefault(options, WriterOptions.KEYSTORE_TYPE.name(), "PKCS12");
this.truststorePassword = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_PASSWORD.name(), null);
this.truststorePath = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_PATH.name(), null);
this.truststoreBase64Encoded = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_BASE64_ENCODED.name(), null);
this.truststoreType = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_TYPE.name(), null);
this.writeMode = MapUtils.getEnumOption(options, WriterOptions.WRITE_MODE.name(), WriteMode.INSERT, "write mode");
// For backwards-compatibility with port settings, use writer option if available,
// else fall back to props, and then default if neither specified
this.useOpenSsl = getBoolean(USE_OPENSSL, true);
this.ringRetryCount = getInt(RING_RETRY_COUNT, DEFAULT_RING_RETRY_COUNT);
this.importCoordinatorTimeoutMultiplier = getDouble(IMPORT_COORDINATOR_TIMEOUT_MULTIPLIER, 0.5);
this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(), null);
this.timestamp = MapUtils.getOrDefault(options, WriterOptions.TIMESTAMP.name(), null);
this.quoteIdentifiers = MapUtils.getBoolean(options, WriterOptions.QUOTE_IDENTIFIERS.name(), false, "quote identifiers");
int storageClientConcurrency = MapUtils.getInt(options, WriterOptions.STORAGE_CLIENT_CONCURRENCY.name(),
DEFAULT_STORAGE_CLIENT_CONCURRENCY, "storage client concurrency");
long storageClientKeepAliveSeconds = MapUtils.getLong(options, WriterOptions.STORAGE_CLIENT_THREAD_KEEP_ALIVE_SECONDS.name(),
DEFAULT_STORAGE_CLIENT_KEEP_ALIVE_SECONDS);
int storageClientMaxChunkSizeInBytes = MapUtils.getInt(options, WriterOptions.STORAGE_CLIENT_MAX_CHUNK_SIZE_IN_BYTES.name(),
DEFAULT_STORAGE_CLIENT_MAX_CHUNK_SIZE_IN_BYTES);
String storageClientHttpsProxy = MapUtils.getOrDefault(options, WriterOptions.STORAGE_CLIENT_HTTPS_PROXY.name(), null);
String storageClientEndpointOverride = MapUtils.getOrDefault(options, WriterOptions.STORAGE_CLIENT_ENDPOINT_OVERRIDE.name(), null);
long nioHttpClientConnectionAcquisitionTimeoutSeconds =
MapUtils.getLong(options, WriterOptions.STORAGE_CLIENT_NIO_HTTP_CLIENT_CONNECTION_ACQUISITION_TIMEOUT_SECONDS.name(), 300);
int nioHttpClientMaxConcurrency = MapUtils.getInt(options, WriterOptions.STORAGE_CLIENT_NIO_HTTP_CLIENT_MAX_CONCURRENCY.name(), 50);
this.storageClientConfig = new StorageClientConfig(storageClientConcurrency,
storageClientKeepAliveSeconds,
storageClientMaxChunkSizeInBytes,
storageClientHttpsProxy,
storageClientEndpointOverride,
nioHttpClientConnectionAcquisitionTimeoutSeconds,
nioHttpClientMaxConcurrency);
DataTransport dataTransport = MapUtils.getEnumOption(options, WriterOptions.DATA_TRANSPORT.name(), DataTransport.DIRECT, "Data Transport");
long maxSizePerSSTableBundleInBytesS3Transport = MapUtils.getLong(options, WriterOptions.MAX_SIZE_PER_SSTABLE_BUNDLE_IN_BYTES_S3_TRANSPORT.name(),
DEFAULT_MAX_SIZE_PER_SSTABLE_BUNDLE_IN_BYTES_S3_TRANSPORT);
String transportExtensionClass = MapUtils.getOrDefault(options, WriterOptions.DATA_TRANSPORT_EXTENSION_CLASS.name(), null);
this.dataTransportInfo = new DataTransportInfo(dataTransport, transportExtensionClass, maxSizePerSSTableBundleInBytesS3Transport);
this.jobKeepAliveMinutes = MapUtils.getInt(options, WriterOptions.JOB_KEEP_ALIVE_MINUTES.name(), MINIMUM_JOB_KEEP_ALIVE_MINUTES);
if (this.jobKeepAliveMinutes < MINIMUM_JOB_KEEP_ALIVE_MINUTES)
{
throw new IllegalArgumentException(String.format("Invalid value for the '%s' Bulk Writer option (%d). It cannot be less than the minimum %s",
WriterOptions.JOB_KEEP_ALIVE_MINUTES, jobKeepAliveMinutes, MINIMUM_JOB_KEEP_ALIVE_MINUTES));
}
this.jobTimeoutSeconds = MapUtils.getLong(options, WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L);
this.configuredJobId = MapUtils.getOrDefault(options, WriterOptions.JOB_ID.name(), null);
this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, WriterOptions.COORDINATED_WRITE_CONFIG.name(), null);
this.coordinatedWriteConf = buildCoordinatedWriteConf(dataTransportInfo.getTransport());
this.digestAlgorithmSupplier = digestAlgorithmSupplierFromOptions(dataTransport, options);
validateEnvironment();
}