in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala [246:334]
override def createTable(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = {
val tableProps = properties.asScala
Try(getTableMetaData(connector, ident)) match {
case Success(_) => throw new TableAlreadyExistsException(ident)
case Failure(noSuchTableException: NoSuchTableException) => //We can create this table
case Failure(e) => throw e
}
//There is an implicit for this but it's only accessible in org.apache.spark.sql.catalog (maybe we should use it)
val invalidPartitions = partitions.filter(_.name() != "identity")
if (invalidPartitions.nonEmpty) {
throw new UnsupportedOperationException(s"Cassandra Tables can only by partitioned based on direct references to columns, found: ${invalidPartitions.mkString(",")}")
}
val providedPartitionKeyNames = partitions.map(_.references().head.fieldNames().head)
val partitionKeys = providedPartitionKeyNames match {
case partitionKeyNames: Array[String] if partitionKeyNames.nonEmpty && tableProps.contains(PartitionKey) =>
throw new CassandraCatalogException(s"Table property $PartitionKey is specified as well as PARTITIONED BY clause, please use just one")
case partitionKeyNames: Array[String] if partitionKeyNames.nonEmpty =>
partitionKeyNames
case _: Array[String] if tableProps.contains(PartitionKey) =>
tableProps(PartitionKey).split(",").map(_.replaceAll("\\s", ""))
case _ =>
throw new CassandraCatalogException(s"Cassandra Tables need partition keys defined in property $PartitionKey or with 'PARTITIONED BY columns")
}
val partitionKeyNames = partitionKeys.map(fromInternal)
val clusteringKeyNames = tableProps
.get(ClusteringKey).toSeq
.flatMap(value => value.split(",").map(_.replaceAll("\\s", "").split("\\.")))
.map {
case Array(name, order) =>
val clusteringOrder = Try(ClusteringOrder.valueOf(order.toUpperCase()))
.getOrElse(throw new CassandraCatalogException(s"Invalid clustering order found in ${name}.${order}, must be ASC or DESC or blank"))
(fromInternal(name), clusteringOrder)
case Array(name) =>
(fromInternal(name), ClusteringOrder.ASC)
case invalid =>
throw new CassandraCatalogException(s"Unable to parse clustering column ${invalid}, too many components")
}
val protocolVersion = connector.withSessionDo(_.getContext.getProtocolVersion)
val columnToType = schema.fields.map(sparkField =>
(fromInternal(sparkField.name), sparkSqlToJavaDriverType(sparkField.dataType, protocolVersion))
).toMap
checkNamespace(ident.namespace())
val namespace = fromInternal(ident.namespace.head)
val table = fromInternal(ident.name())
val createTableStart: OngoingPartitionKey = SchemaBuilder.createTable(namespace, table)
val createTableWithPk: CreateTable = partitionKeyNames.foldLeft(createTableStart) { (createTable, pkName) =>
val dataType = columnToType.getOrElse(pkName,
throw new CassandraCatalogException(s"$pkName was defined as a partition key but it does not exist in the schema ${schema.fieldNames.mkString(",")}"))
createTable.withPartitionKey(pkName, dataType).asInstanceOf[OngoingPartitionKey]
}.asInstanceOf[CreateTable]
val createTableWithClustering = clusteringKeyNames.foldLeft(createTableWithPk) { (createTable, ckName) =>
val dataType =
columnToType.get(fromInternal(ckName._1.asInternal().toLowerCase(Locale.ROOT))) //Check for lower Cased column name as well
.orElse(columnToType.get(ckName._1))
.getOrElse(throw new CassandraCatalogException(s"$ckName was defined as a clustering key but it does not exist in the schema ${schema.fieldNames.mkString(",")}"))
createTable
.withClusteringColumn(ckName._1, dataType)
.withClusteringOrder(ckName._1, ckName._2)
.asInstanceOf[CreateTable]
}
val normalColumns = schema.fieldNames.map(fromInternal).toSet -- (clusteringKeyNames.map(_._1) ++ partitionKeyNames)
val createTableWithColumns = normalColumns.foldLeft(createTableWithClustering) { (createTable, colName) =>
val dataType = columnToType(colName)
createTable.withColumn(colName, dataType)
}
val (userProperties, unusedProperties) = tableProps.partition { case (key, _) => CassandraProperties.contains(key) }
logInfo(s"Ignoring non-cassandra properties for table $unusedProperties")
val createTableWithProperties = userProperties.foldLeft(createTableWithColumns) {
case (createStmt, (key, value)) => createStmt.withOption(key, parseProperty(value)).asInstanceOf[CreateTable]
}
connector.withSessionDo(_.execute(createTableWithProperties.asCql()))
loadTable(ident)
}