def createCassandraTableEx()

in connector/src/main/scala/com/datastax/spark/connector/DatasetFunctions.scala [66:119]


  def createCassandraTableEx(
    keyspaceName: String,
    tableName: String,
    partitionKeyColumns: Seq[String],
    clusteringKeyColumns: Seq[(String, ClusteringColumn.SortingOrder)],
    ifNotExists: Boolean = false,
    tableOptions: Map[String, String] = Map())(
  implicit
    connector: CassandraConnector = CassandraConnector(sparkContext)): Unit = {

    val protocolVersion = connector.withSessionDo(_.getContext.getProtocolVersion)

    val rawTable = new DataFrameColumnMapper(dataset.schema).newTable(keyspaceName, tableName, protocolVersion)
    val columnMapping = rawTable.columnByName

    val columnNames = columnMapping.keys.toSet
    val partitionKeyNames = partitionKeyColumns
    val clusteringKeyNames = clusteringKeyColumns.map(_._1)
    val regularColumnNames = (columnNames -- (partitionKeyNames ++ clusteringKeyNames)).toSeq

    def missingColumnException(columnName: String, columnType: String) = {
      new IllegalArgumentException(
        s""""$columnName" not Found. Unable to make specified column $columnName a $columnType.
          |Available Columns: $columnNames""".stripMargin)
    }

    val table = rawTable.copy (
      partitionKey = partitionKeyNames
          .map(partitionKeyName =>
            columnMapping.getOrElse(partitionKeyName,
              throw missingColumnException(partitionKeyName, "Partition Key Column")))
          .map(_.copy(columnRole = PartitionKeyColumn))
      ,
      clusteringColumns = clusteringKeyColumns
          .map(clusteringKey =>
            (columnMapping.getOrElse(clusteringKey._1,
              throw missingColumnException(clusteringKey._1, "Clustering Column")),
              clusteringKey._2))
          .zipWithIndex
          .map { case (col, index) => col._1.copy(columnRole = ClusteringColumn(index, col._2))}
      ,
      regularColumns = regularColumnNames
          .map(regularColumnName =>
             columnMapping.getOrElse(regularColumnName,
                  throw missingColumnException(regularColumnName, "Regular Column")))
          .map(_.copy(columnRole = RegularColumn))
      ,
      ifNotExists = ifNotExists
      ,
      tableOptions = tableOptions
    )

    connector.withSessionDo(session => session.execute(table.cql))
  }