override def alterNamespace()

in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala [171:202]


  override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
    checkNamespace(namespace)

    val ksMeta: mutable.Map[String, String] = changes.foldLeft(loadNamespaceMetadata(namespace).asScala) {
      case (metadata: mutable.Map[String, String], setProperty: SetProperty) =>
        metadata.clone() += (setProperty.property() -> setProperty.value)
      case (metadata: mutable.Map[String, String], removeProperty: RemoveProperty) =>
        metadata - removeProperty.property()
      case (_, other) => throw new CassandraCatalogException(s"Unable to handle alter namespace operation: ${other.getClass.getSimpleName}")
    }

    val alterStart = SchemaBuilder.alterKeyspace(namespace.head)
    val alterWithDurable = alterStart.withDurableWrites(ksMeta.getOrElse(DurableWrites, "True").toBoolean)
    val replicationClass = ksMeta
      .getOrElse(ReplicationClass, throw new CassandraCatalogException(s"Altering a keyspace requires a $ReplicationClass option"))
      .split("\\.")
      .last
    val alterWithReplication = replicationClass.toLowerCase(Locale.ROOT) match {
      case SimpleStrategy =>
        val replicationFactor = ksMeta.getOrElse(ReplicationFactor,
          throw new CassandraCatalogException(s"Need a $ReplicationFactor option with SimpleStrategy"))
        alterWithDurable.withSimpleStrategy(replicationFactor.toInt)
      case NetworkTopologyStrategy =>
        val datacenters = (ksMeta -- IgnoredReplicationOptions).map(pair => (pair._1, pair._2.toInt: java.lang.Integer))
        alterWithDurable.withNetworkTopologyStrategy(datacenters.asJava)
      case other => throw new CassandraCatalogException(s"Unknown replication strategy $other")
    }

    connector.withSessionDo(session =>
      session.execute(alterWithReplication.asCql())
    )
  }