override def createTable()

in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala [246:334]


  override def createTable(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = {
    val tableProps = properties.asScala
    Try(getTableMetaData(connector, ident)) match {
      case Success(_) => throw new TableAlreadyExistsException(ident)
      case Failure(noSuchTableException: NoSuchTableException) => //We can create this table
      case Failure(e) => throw e
    }

    //There is an implicit for this but it's only accessible in org.apache.spark.sql.catalog (maybe we should use it)
    val invalidPartitions = partitions.filter(_.name() != "identity")
    if (invalidPartitions.nonEmpty) {
      throw new UnsupportedOperationException(s"Cassandra Tables can only by partitioned based on direct references to columns, found: ${invalidPartitions.mkString(",")}")
    }

    val providedPartitionKeyNames = partitions.map(_.references().head.fieldNames().head)

    val partitionKeys = providedPartitionKeyNames match {
      case partitionKeyNames: Array[String] if partitionKeyNames.nonEmpty && tableProps.contains(PartitionKey) =>
        throw new CassandraCatalogException(s"Table property $PartitionKey is specified as well as PARTITIONED BY clause, please use just one")
      case partitionKeyNames: Array[String] if partitionKeyNames.nonEmpty =>
        partitionKeyNames
      case _: Array[String] if tableProps.contains(PartitionKey) =>
        tableProps(PartitionKey).split(",").map(_.replaceAll("\\s", ""))
      case _ =>
        throw new CassandraCatalogException(s"Cassandra Tables need partition keys defined in property $PartitionKey or with 'PARTITIONED BY columns")
    }

    val partitionKeyNames = partitionKeys.map(fromInternal)

    val clusteringKeyNames = tableProps
      .get(ClusteringKey).toSeq
      .flatMap(value => value.split(",").map(_.replaceAll("\\s", "").split("\\.")))
      .map {
        case Array(name, order) =>
          val clusteringOrder = Try(ClusteringOrder.valueOf(order.toUpperCase()))
            .getOrElse(throw new CassandraCatalogException(s"Invalid clustering order found in ${name}.${order}, must be ASC or DESC or blank"))
          (fromInternal(name), clusteringOrder)
        case Array(name) =>
          (fromInternal(name), ClusteringOrder.ASC)
        case invalid =>
          throw new CassandraCatalogException(s"Unable to parse clustering column ${invalid}, too many components")
      }

    val protocolVersion = connector.withSessionDo(_.getContext.getProtocolVersion)

    val columnToType = schema.fields.map(sparkField =>
      (fromInternal(sparkField.name), sparkSqlToJavaDriverType(sparkField.dataType, protocolVersion))
    ).toMap

    checkNamespace(ident.namespace())
    val namespace = fromInternal(ident.namespace.head)
    val table = fromInternal(ident.name())

    val createTableStart: OngoingPartitionKey = SchemaBuilder.createTable(namespace, table)

    val createTableWithPk: CreateTable = partitionKeyNames.foldLeft(createTableStart) { (createTable, pkName) =>
      val dataType = columnToType.getOrElse(pkName,
        throw new CassandraCatalogException(s"$pkName was defined as a partition key but it does not exist in the schema ${schema.fieldNames.mkString(",")}"))
      createTable.withPartitionKey(pkName, dataType).asInstanceOf[OngoingPartitionKey]
    }.asInstanceOf[CreateTable]

    val createTableWithClustering = clusteringKeyNames.foldLeft(createTableWithPk) { (createTable, ckName) =>
      val dataType =
        columnToType.get(fromInternal(ckName._1.asInternal().toLowerCase(Locale.ROOT))) //Check for lower Cased column name as well
          .orElse(columnToType.get(ckName._1))
          .getOrElse(throw new CassandraCatalogException(s"$ckName was defined as a clustering key but it does not exist in the schema ${schema.fieldNames.mkString(",")}"))
      createTable
        .withClusteringColumn(ckName._1, dataType)
        .withClusteringOrder(ckName._1, ckName._2)
        .asInstanceOf[CreateTable]
    }

    val normalColumns = schema.fieldNames.map(fromInternal).toSet -- (clusteringKeyNames.map(_._1) ++ partitionKeyNames)

    val createTableWithColumns = normalColumns.foldLeft(createTableWithClustering) { (createTable, colName) =>
      val dataType = columnToType(colName)
      createTable.withColumn(colName, dataType)
    }

    val (userProperties, unusedProperties) = tableProps.partition { case (key, _) => CassandraProperties.contains(key) }
    logInfo(s"Ignoring non-cassandra properties for table $unusedProperties")
    val createTableWithProperties = userProperties.foldLeft(createTableWithColumns) {
      case (createStmt, (key, value)) => createStmt.withOption(key, parseProperty(value)).asInstanceOf[CreateTable]
    }

    connector.withSessionDo(_.execute(createTableWithProperties.asCql()))

    loadTable(ident)
  }