in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala [364:418]
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val protocolVersion = connector.withSessionDo(_.getContext.getProtocolVersion)
val tableMetadata = getTableMetaData(connector, ident)
val keyspace = tableMetadata.getKeyspace
val table = tableMetadata.getName
//Check for unsupported table changes
changes.foreach {
case _: AddColumn | _: DeleteColumn | _: TableChange.SetProperty =>
case other: TableChange => throw new CassandraCatalogException(s"Cassandra Catalog does not support Alter operation: ${other.getClass.getSimpleName}")
}
val propertiesToAdd = changes.collect { case setProperty: TableChange.SetProperty =>
if (CassandraProperties.contains(setProperty.property())) {
setProperty
} else {
throw new CassandraCatalogException(s"Unable to set unknown Cassandra Property ${setProperty.property()}")
}
}
val columnsToRemove = changes.collect { case remove: DeleteColumn =>
checkRemoveNormalColumn(tableMetadata, remove.fieldNames())
}
val columnsToAdd = changes.collect { case add: AddColumn =>
(checkColumnName(add.fieldNames()), sparkSqlToJavaDriverType(add.dataType(), protocolVersion))
}
if (propertiesToAdd.nonEmpty) {
val setOptionsStatement = propertiesToAdd.foldLeft(
SchemaBuilder.alterTable(keyspace, table).asInstanceOf[AlterTableWithOptionsEnd]) { case (alter, prop) =>
alter.withOption(prop.property(), parseProperty(prop.value()))
}.asCql()
logDebug(s"Executing Set Table Properties: $setOptionsStatement")
connector.withSessionDo(_.execute(setOptionsStatement))
}
if (columnsToRemove.nonEmpty) {
val dropColumnsStatement = SchemaBuilder.alterTable(keyspace, table).dropColumns(columnsToRemove: _*).asCql
logDebug(s"Executing Drop Table Columns: $dropColumnsStatement")
connector.withSessionDo(_.execute(dropColumnsStatement))
}
if (columnsToAdd.nonEmpty) {
val addColumnStatement = columnsToAdd.foldRight(
SchemaBuilder.alterTable(keyspace, table).asInstanceOf[AlterTableAddColumn]
) { case ((colName, dataType), alterBuilder) =>
alterBuilder.addColumn(colName, dataType)
}.asInstanceOf[AlterTableAddColumnEnd].asCql()
logDebug(s"Executing Add Table Columns: $addColumnStatement")
connector.withSessionDo(_.execute(addColumnStatement))
}
logDebug(s"Table Alteration Complete")
loadTable(ident)
}