in connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala [65:135]
def connectorConfigBuilder(conf: CassandraConnectorConf, initBuilder: PDCLB) = {
def basicProperties(builder: PDCLB): PDCLB = {
val localCoreThreadCount = Math.max(1, Runtime.getRuntime.availableProcessors() - 1)
builder
.withInt(CONNECTION_POOL_LOCAL_SIZE, conf.localConnectionsPerExecutor.getOrElse(localCoreThreadCount)) // moved from CassandraConnector
.withInt(CONNECTION_POOL_REMOTE_SIZE, conf.remoteConnectionsPerExecutor.getOrElse(1)) // moved from CassandraConnector
.withInt(CONNECTION_INIT_QUERY_TIMEOUT, conf.connectTimeoutMillis)
.withDuration(CONTROL_CONNECTION_TIMEOUT, Duration.ofMillis(conf.connectTimeoutMillis))
.withDuration(METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(conf.connectTimeoutMillis))
.withInt(REQUEST_TIMEOUT, conf.readTimeoutMillis)
.withClass(RETRY_POLICY_CLASS, classOf[MultipleRetryPolicy])
.withClass(RECONNECTION_POLICY_CLASS, classOf[ExponentialReconnectionPolicy])
.withDuration(RECONNECTION_BASE_DELAY, Duration.ofMillis(conf.minReconnectionDelayMillis))
.withDuration(RECONNECTION_MAX_DELAY, Duration.ofMillis(conf.maxReconnectionDelayMillis))
.withInt(NETTY_ADMIN_SHUTDOWN_QUIET_PERIOD, conf.quietPeriodBeforeCloseMillis / 1000)
.withInt(NETTY_ADMIN_SHUTDOWN_TIMEOUT, conf.timeoutBeforeCloseMillis / 1000)
.withInt(NETTY_IO_SHUTDOWN_QUIET_PERIOD, conf.quietPeriodBeforeCloseMillis / 1000)
.withInt(NETTY_IO_SHUTDOWN_TIMEOUT, conf.timeoutBeforeCloseMillis / 1000)
.withBoolean(NETTY_DAEMON, true)
.withBoolean(RESOLVE_CONTACT_POINTS, conf.resolveContactPoints)
.withInt(MultipleRetryPolicy.MaxRetryCount, conf.queryRetryCount)
.withDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE, Duration.ofMillis(conf.readTimeoutMillis))
.withDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES, Duration.ofMillis(conf.readTimeoutMillis))
}
// compression option cannot be set to NONE (default)
def compressionProperties(b: PDCLB): PDCLB =
Option(conf.compression)
.filter(_.toLowerCase != "none")
.fold(b)(c => b.withString(PROTOCOL_COMPRESSION, c.toLowerCase))
def localDCProperty(b: PDCLB): PDCLB =
conf.localDC.map(b.withString(LOAD_BALANCING_LOCAL_DATACENTER, _)).getOrElse(b)
// add ssl properties if ssl is enabled
def ipBasedConnectionProperties(ipConf: IpBasedContactInfo) = (builder: PDCLB) => {
builder
.withStringList(CONTACT_POINTS, ipConf.hosts.map(h => s"${h.getHostString}:${h.getPort}").toList.asJava)
.withClass(LOAD_BALANCING_POLICY_CLASS, classOf[LocalNodeFirstLoadBalancingPolicy])
def clientAuthEnabled(value: Option[String]) =
if (ipConf.cassandraSSLConf.clientAuthEnabled) value else None
if (ipConf.cassandraSSLConf.enabled) {
Seq(
SSL_TRUSTSTORE_PATH -> ipConf.cassandraSSLConf.trustStorePath,
SSL_TRUSTSTORE_PASSWORD -> ipConf.cassandraSSLConf.trustStorePassword,
SSL_KEYSTORE_PATH -> clientAuthEnabled(ipConf.cassandraSSLConf.keyStorePath),
SSL_KEYSTORE_PASSWORD -> clientAuthEnabled(ipConf.cassandraSSLConf.keyStorePassword))
.foldLeft(builder) { case (b, (name, value)) =>
value.map(b.withString(name, _)).getOrElse(b)
}
.withClass(SSL_ENGINE_FACTORY_CLASS, classOf[DefaultSslEngineFactory])
.withStringList(SSL_CIPHER_SUITES, ipConf.cassandraSSLConf.enabledAlgorithms.toList.asJava)
.withBoolean(SSL_HOSTNAME_VALIDATION, false) // TODO: this needs to be configurable by users. Set to false for our integration tests
} else {
builder
}
}
val universalProperties: Seq[PDCLB => PDCLB] =
Seq( basicProperties, compressionProperties, localDCProperty)
val appliedProperties: Seq[PDCLB => PDCLB] = conf.contactInfo match {
case ipConf: IpBasedContactInfo => universalProperties :+ ipBasedConnectionProperties(ipConf)
case other => universalProperties
}
appliedProperties.foldLeft(initBuilder){ case (builder, properties) => properties(builder)}
}