in integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala [257:418]
def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext,
BucketSpecContext, PartitionFieldListContext, ColTypeListContext, TablePropertyListContext,
LocationSpecContext, Option[String], TerminalNode, QueryContext, String),
extraTableTuple: (Seq[StructField], Boolean, TableIdentifier, Boolean, Seq[String],
Option[String], mutable.Map[String, String], Map[String, String], Seq[StructField],
Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession,
Option[LogicalPlan])): LogicalPlan = {
val (tableHeader, skewSpecContext, bucketSpecContext, partitionColumns, columns,
tablePropertyList, locationSpecContext, tableComment, ctas, query, provider) = createTableTuple
val (cols, external, tableIdentifier, ifNotExists, colNames, tablePath,
tableProperties, properties, partitionByStructFields, partitionFields,
parser, sparkSession, selectQuery) = extraTableTuple
val options = new CarbonOption(properties)
// validate streaming property
validateStreamingProperty(options)
// with Spark 3.1, partitioned columns can be already present in schema.
// Check and remove from fields and add partition columns at last
val updatedCols = cols.filterNot(x => partitionByStructFields.contains(x))
var fields = parser.getFields(updatedCols ++ partitionByStructFields)
// validate for create table as select
selectQuery match {
case Some(q) =>
// create table as select does not allow creation of partitioned table
if (partitionFields.nonEmpty) {
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
"create a partitioned table using Carbondata file formats."
operationNotAllowed(errorMessage, partitionColumns)
}
// create table as select does not allow to explicitly specify schema
if (fields.nonEmpty) {
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS) statement", columns)
}
// external table is not allow
if (external) {
operationNotAllowed("Create external table as select", tableHeader)
}
fields = parser
.getFields(CarbonEnv.getInstance(sparkSession).carbonMetaStore
.getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
case _ =>
// ignore this case
}
val columnNames = fields.map(_.name.get)
checkIfDuplicateColumnExists(columns, tableIdentifier, columnNames)
if (partitionFields.nonEmpty && options.isStreaming) {
operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns)
}
if (!external && fields.isEmpty) {
throw new MalformedCarbonCommandException("Creating table without column(s) is not supported")
}
if (external && fields.isEmpty && tableProperties.nonEmpty) {
// as fields are always zero for external table, cannot validate table properties.
operationNotAllowed(
"Table properties are not supported for external table", tablePropertyList)
}
// Global dictionary is deprecated since 2.0
if (tableProperties.contains(CarbonCommonConstants.DICTIONARY_INCLUDE) ||
tableProperties.contains(CarbonCommonConstants.DICTIONARY_EXCLUDE)) {
DeprecatedFeatureException.globalDictNotSupported()
}
val bucketFields = parser.getBucketFields(tableProperties, fields, options)
var isTransactionalTable: Boolean = true
val tableInfo = if (external) {
if (fields.nonEmpty) {
// user provided schema for this external table, this is not allow currently
// see CARBONDATA-2866
operationNotAllowed(
"Schema must not be specified for external table", columns)
}
if (partitionByStructFields.nonEmpty) {
operationNotAllowed(
"Partition is not supported for external table", partitionColumns)
}
// read table info from schema file in the provided table path
// external table also must convert table name to lower case
val identifier = AbsoluteTableIdentifier.from(
tablePath.get,
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
tableIdentifier.table.toLowerCase())
val table = try {
val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
if (!FileFactory.isFileExist(schemaPath)) {
if (provider.equalsIgnoreCase("'carbonfile'")) {
SchemaReader.inferSchema(identifier, true)
} else {
isTransactionalTable = false
SchemaReader.inferSchema(identifier, false)
}
} else {
SchemaReader.getTableInfo(identifier)
}
} catch {
case e: Throwable =>
operationNotAllowed(s"Invalid table path provided: ${ tablePath.get } ", tableHeader)
}
// set "_external" property, so that DROP TABLE will not delete the data
if (provider.equalsIgnoreCase("'carbonfile'")) {
table.getFactTable.getTableProperties.put("_filelevelformat", "true")
table.getFactTable.getTableProperties.put("_external", "false")
} else {
table.getFactTable.getTableProperties.put("_external", "true")
table.getFactTable.getTableProperties.put("_filelevelformat", "false")
}
var isLocalDic_enabled = table.getFactTable.getTableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
if (null == isLocalDic_enabled) {
table.getFactTable.getTableProperties
.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_SYSTEM_ENABLE,
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT))
}
isLocalDic_enabled = table.getFactTable.getTableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
isLocalDic_enabled.toBoolean) {
val allColumns = table.getFactTable.getListOfColumns
for (i <- 0 until allColumns.size()) {
val cols = allColumns.get(i)
if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) {
cols.setLocalDictColumn(true)
}
}
table.getFactTable.setListOfColumns(allColumns)
}
table
} else {
// prepare table model of the collected tokens
val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
ifNotExists,
convertDbNameToLowerCase(tableIdentifier.database),
tableIdentifier.table.toLowerCase,
fields,
partitionFields,
tableProperties,
bucketFields,
isAlterFlow = false,
tableComment)
TableNewProcessor(tableModel)
}
tableInfo.setTransactionalTable(isTransactionalTable)
selectQuery match {
case query@Some(q) =>
CarbonCreateTableAsSelectCommand(
tableInfo = tableInfo,
query = query.get,
ifNotExistsSet = ifNotExists,
tableLocation = tablePath)
case _ =>
CarbonCreateTableCommand(
tableInfo = tableInfo,
ifNotExistsSet = ifNotExists,
tableLocation = tablePath,
external)
}
}