override def alterTable()

in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala [364:418]


  override def alterTable(ident: Identifier, changes: TableChange*): Table = {
    val protocolVersion = connector.withSessionDo(_.getContext.getProtocolVersion)
    val tableMetadata = getTableMetaData(connector, ident)
    val keyspace = tableMetadata.getKeyspace
    val table = tableMetadata.getName

    //Check for unsupported table changes
    changes.foreach {
      case _: AddColumn | _: DeleteColumn | _: TableChange.SetProperty =>
      case other: TableChange => throw new CassandraCatalogException(s"Cassandra Catalog does not support Alter operation: ${other.getClass.getSimpleName}")
    }

    val propertiesToAdd = changes.collect { case setProperty: TableChange.SetProperty =>
      if (CassandraProperties.contains(setProperty.property())) {
        setProperty
      } else {
        throw new CassandraCatalogException(s"Unable to set unknown Cassandra Property ${setProperty.property()}")
      }
    }
    val columnsToRemove = changes.collect { case remove: DeleteColumn =>
      checkRemoveNormalColumn(tableMetadata, remove.fieldNames())
    }

    val columnsToAdd = changes.collect { case add: AddColumn =>
      (checkColumnName(add.fieldNames()), sparkSqlToJavaDriverType(add.dataType(), protocolVersion))
    }

    if (propertiesToAdd.nonEmpty) {
      val setOptionsStatement = propertiesToAdd.foldLeft(
        SchemaBuilder.alterTable(keyspace, table).asInstanceOf[AlterTableWithOptionsEnd]) { case (alter, prop) =>
        alter.withOption(prop.property(), parseProperty(prop.value()))
      }.asCql()
      logDebug(s"Executing Set Table Properties: $setOptionsStatement")
      connector.withSessionDo(_.execute(setOptionsStatement))
    }

    if (columnsToRemove.nonEmpty) {
      val dropColumnsStatement = SchemaBuilder.alterTable(keyspace, table).dropColumns(columnsToRemove: _*).asCql
      logDebug(s"Executing Drop Table Columns: $dropColumnsStatement")
      connector.withSessionDo(_.execute(dropColumnsStatement))
    }

    if (columnsToAdd.nonEmpty) {
      val addColumnStatement = columnsToAdd.foldRight(
        SchemaBuilder.alterTable(keyspace, table).asInstanceOf[AlterTableAddColumn]
      ) { case ((colName, dataType), alterBuilder) =>
        alterBuilder.addColumn(colName, dataType)
      }.asInstanceOf[AlterTableAddColumnEnd].asCql()
      logDebug(s"Executing Add Table Columns: $addColumnStatement")
      connector.withSessionDo(_.execute(addColumnStatement))
    }

    logDebug(s"Table Alteration Complete")
    loadTable(ident)
  }