in spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala [869:968]
private def getSQLString(
projectName: String,
tableName: String,
schema: TableSchema,
ifNotExists: Boolean,
tableDefinition: CatalogTable): String = {
val sb = new StringBuilder
if (tableDefinition.tableType != CatalogTableType.EXTERNAL) {
sb.append("CREATE TABLE ")
} else {
sb.append("CREATE EXTERNAL TABLE ")
}
if (ifNotExists) {
sb.append(" IF NOT EXISTS ")
}
sb.append(projectName).append(".`").append(tableName).append("` (")
val columns = schema.getColumns
var pColumns = 0
while (pColumns < columns.size) {
{
val i = columns.get(pColumns)
sb.append("`").append(i.getName).append("` ").append(i.getTypeInfo.getTypeName)
if (i.getComment != null) sb.append(" COMMENT \'").append(i.getComment).append("\'")
if (pColumns + 1 < columns.size) sb.append(',')
}
{
pColumns += 1
pColumns
}
}
sb.append(')')
tableDefinition.comment map (comment => sb.append(" COMMENT \'" + comment + "\' "))
val partCols = schema.getPartitionColumns
// partitioned by
if (partCols.size > 0) {
sb.append(" PARTITIONED BY (")
var index = 0
while (index < partCols.size) {
val c = partCols.get(index)
sb.append(c.getName).append(" ").append(c.getTypeInfo.getTypeName)
if (c.getComment != null) {
sb.append(" COMMENT \'").append(c.getComment).append("\'")
}
if (index + 1 < partCols.size) {
sb.append(',')
}
index += 1
}
sb.append(')')
}
// clustered by
tableDefinition.bucketSpec.map(bucketSpec => {
sb.append(" CLUSTERED BY ")
val bucketCols = bucketSpec.bucketColumnNames.mkString("(", ",", ")")
sb.append(bucketCols)
val sortCols = bucketSpec.sortColumnNames.mkString("(", ",", ")")
if (!sortCols.isEmpty) {
sb.append(" SORTED BY ").append(sortCols)
}
sb.append(" INTO ").append(bucketSpec.numBuckets).append(" BUCKETS")
})
// storage
if (tableDefinition.tableType == CatalogTableType.EXTERNAL) {
// external table
require(tableDefinition.storage.locationUri.isDefined)
val outputFormat = tableDefinition.storage.outputFormat.get
val formatList = Set("PARQUET", "TEXTFILE", "ORC", "RCFILE", "AVRO", "SEQUENCEFILE")
val outputFormatClause = if (formatList.contains(outputFormat.toUpperCase)) {
s" STORED AS $outputFormat"
} else {
s" STORED BY '$outputFormat'"
}
sb.append(outputFormatClause)
if (tableDefinition.storage.properties.nonEmpty) {
val properties = tableDefinition.storage.properties
.mkString(" WITH SERDEPROPERTIES (", ",", ")")
sb.append(properties)
}
sb.append(s" LOCATION '${tableDefinition.storage.locationUri.get.toString}'")
} else {
// non-external table
// TODO: support aliort
// tableDefinition.storage.outputFormat foreach (format => sb.append(s" STORED AS $format"))
}
// table properties
if (tableDefinition.properties.nonEmpty) {
val props = tableDefinition.properties.map(x => {
s"'${x._1}'='${x._2}'".stripMargin
}) mkString("(", ",", ")")
sb.append(" TBLPROPERTIES ").append(props)
}
sb.append(';')
sb.toString
}