in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala [80:123]
def consolidateConfs(
sparkConf: SparkConf,
sqlConf: Map[String, String],
cluster: String = "default",
keyspace: String = "",
userOptions: Map[String, String] = Map.empty): SparkConf = {
//Default settings
val conf = sparkConf.clone()
def consolidate(prop: String): Option[String] = {
Seq(
//userOptions is actually a caseInsensitive map so lower case keys must be used
userOptions.get(prop.toLowerCase(Locale.ROOT)),
sqlConf.get(s"$cluster:$keyspace/$prop"),
sqlConf.get(s"$cluster/$prop"),
sqlConf.get(s"default/$prop"),
sqlConf.get(prop)).flatten.headOption
}
// Custom connection factories may have a set of custom supported properties
val factoryName = consolidate(CassandraConnectionFactory.FactoryParam.name)
val customConnectionFactoryProperties =
CassandraConnectionFactory.fromNameOrDefault(factoryName).properties
val AllSCCConfNames = ConfigParameter.names ++ DeprecatedConfigParameter.names ++
customConnectionFactoryProperties
val SqlPropertyKeys = AllSCCConfNames.flatMap(prop => Seq(
s"$cluster:$keyspace/$prop",
s"$cluster/$prop",
s"default/$prop",
prop))
//Keyspace/Cluster level settings
for (prop <- AllSCCConfNames) {
val value = consolidate(prop)
value.foreach(conf.set(prop, _))
}
conf.setAll(sqlConf -- SqlPropertyKeys)
//Set all user properties while avoiding SCC Properties
conf.setAll(userOptions -- (AllSCCConfNames ++ AllSCCConfNames.map(_.toLowerCase(Locale.ROOT))))
conf
}