in connector/src/main/scala/com/datastax/spark/connector/DatasetFunctions.scala [66:119]
def createCassandraTableEx(
keyspaceName: String,
tableName: String,
partitionKeyColumns: Seq[String],
clusteringKeyColumns: Seq[(String, ClusteringColumn.SortingOrder)],
ifNotExists: Boolean = false,
tableOptions: Map[String, String] = Map())(
implicit
connector: CassandraConnector = CassandraConnector(sparkContext)): Unit = {
val protocolVersion = connector.withSessionDo(_.getContext.getProtocolVersion)
val rawTable = new DataFrameColumnMapper(dataset.schema).newTable(keyspaceName, tableName, protocolVersion)
val columnMapping = rawTable.columnByName
val columnNames = columnMapping.keys.toSet
val partitionKeyNames = partitionKeyColumns
val clusteringKeyNames = clusteringKeyColumns.map(_._1)
val regularColumnNames = (columnNames -- (partitionKeyNames ++ clusteringKeyNames)).toSeq
def missingColumnException(columnName: String, columnType: String) = {
new IllegalArgumentException(
s""""$columnName" not Found. Unable to make specified column $columnName a $columnType.
|Available Columns: $columnNames""".stripMargin)
}
val table = rawTable.copy (
partitionKey = partitionKeyNames
.map(partitionKeyName =>
columnMapping.getOrElse(partitionKeyName,
throw missingColumnException(partitionKeyName, "Partition Key Column")))
.map(_.copy(columnRole = PartitionKeyColumn))
,
clusteringColumns = clusteringKeyColumns
.map(clusteringKey =>
(columnMapping.getOrElse(clusteringKey._1,
throw missingColumnException(clusteringKey._1, "Clustering Column")),
clusteringKey._2))
.zipWithIndex
.map { case (col, index) => col._1.copy(columnRole = ClusteringColumn(index, col._2))}
,
regularColumns = regularColumnNames
.map(regularColumnName =>
columnMapping.getOrElse(regularColumnName,
throw missingColumnException(regularColumnName, "Regular Column")))
.map(_.copy(columnRole = RegularColumn))
,
ifNotExists = ifNotExists
,
tableOptions = tableOptions
)
connector.withSessionDo(session => session.execute(table.cql))
}