in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala [171:202]
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
checkNamespace(namespace)
val ksMeta: mutable.Map[String, String] = changes.foldLeft(loadNamespaceMetadata(namespace).asScala) {
case (metadata: mutable.Map[String, String], setProperty: SetProperty) =>
metadata.clone() += (setProperty.property() -> setProperty.value)
case (metadata: mutable.Map[String, String], removeProperty: RemoveProperty) =>
metadata - removeProperty.property()
case (_, other) => throw new CassandraCatalogException(s"Unable to handle alter namespace operation: ${other.getClass.getSimpleName}")
}
val alterStart = SchemaBuilder.alterKeyspace(namespace.head)
val alterWithDurable = alterStart.withDurableWrites(ksMeta.getOrElse(DurableWrites, "True").toBoolean)
val replicationClass = ksMeta
.getOrElse(ReplicationClass, throw new CassandraCatalogException(s"Altering a keyspace requires a $ReplicationClass option"))
.split("\\.")
.last
val alterWithReplication = replicationClass.toLowerCase(Locale.ROOT) match {
case SimpleStrategy =>
val replicationFactor = ksMeta.getOrElse(ReplicationFactor,
throw new CassandraCatalogException(s"Need a $ReplicationFactor option with SimpleStrategy"))
alterWithDurable.withSimpleStrategy(replicationFactor.toInt)
case NetworkTopologyStrategy =>
val datacenters = (ksMeta -- IgnoredReplicationOptions).map(pair => (pair._1, pair._2.toInt: java.lang.Integer))
alterWithDurable.withNetworkTopologyStrategy(datacenters.asJava)
case other => throw new CassandraCatalogException(s"Unknown replication strategy $other")
}
connector.withSessionDo(session =>
session.execute(alterWithReplication.asCql())
)
}