in integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala [148:319]
def buildTableInfoFromCatalogTable(
table: CatalogTable,
ifNotExists: Boolean,
sparkSession: SparkSession,
selectQuery: Option[LogicalPlan] = None): TableInfo = {
val tableProperties = normalizeProperties(getProperties(table))
val options = new CarbonOption(Map(tableProperties.toSeq: _*))
// validate streaming property
validateStreamingProperty(options)
val parser = new CarbonSpark2SqlParser()
val isExternal = table.tableType == CatalogTableType.EXTERNAL
var fields = parser.getFields(table.schema.fields, isExternal)
val provider = table.provider.get
val partitionColumnNames = table.partitionColumnNames.map(_.toLowerCase)
// validate for create table as select
selectQuery match {
case Some(q) =>
// create table as select does not allow creation of partitioned table
if (partitionColumnNames.nonEmpty) {
throw new MalformedCarbonCommandException(
"A Create Table As Select (CTAS) statement is not allowed to " +
"create a partitioned table using Carbondata file formats.")
}
// create table as select does not allow to explicitly specify schema
if (fields.nonEmpty) {
throw new MalformedCarbonCommandException(
"Schema can not be specified in a Create Table As Select (CTAS) statement")
}
// external table is not allow
if (isExternal) {
throw new MalformedCarbonCommandException(
"Create external table as select is not allowed")
}
fields = parser
.getFields(CarbonEnv.getInstance(sparkSession).carbonMetaStore
.getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
case _ =>
// ignore this case
}
if (partitionColumnNames.nonEmpty && options.isStreaming) {
throw new MalformedCarbonCommandException(
"Streaming is not allowed on partitioned table")
}
if (!isExternal && fields.isEmpty) {
throw new MalformedCarbonCommandException(
"Creating table without column(s) is not supported")
}
// filter out internally added external keyword property
val newTableProperties = tableProperties.filterNot(_._1.equalsIgnoreCase("hasexternalkeyword"))
if (isExternal && fields.isEmpty && newTableProperties.nonEmpty) {
// as fields are always zero for external table, cannot validate table properties.
throw new MalformedCarbonCommandException(
"Table properties are not supported for external table")
}
// validate tblProperties
val tblProperties = mutable.Map(tableProperties.toSeq: _*)
val bucketFields =
parser.getBucketFields(tblProperties, fields, options)
var isTransactionalTable: Boolean = true
// table must convert database name and table name to lower case
val identifier = AbsoluteTableIdentifier.from(
CarbonUtil.checkAndAppendFileSystemURIScheme(table.storage
.locationUri
.map(CatalogUtils.URIToString)
.getOrElse("")),
CarbonEnv.getDatabaseName(table.identifier.database)(sparkSession).toLowerCase(),
table.identifier.table.toLowerCase()
)
val tableInfo = if (isExternal) {
// read table info from schema file in the provided table path
val tableInfo = {
try {
val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
if (!FileFactory.isFileExist(schemaPath)) {
if (provider.equalsIgnoreCase("'carbonfile'")) {
SchemaReader.inferSchema(identifier, true)
} else {
isTransactionalTable = false
SchemaReader.inferSchema(identifier, false)
}
} else {
SchemaReader.getTableInfo(identifier)
}
} catch {
case e: Throwable =>
if (fields.nonEmpty) {
val partitionerFields = fields
.filter(field => partitionColumnNames.contains(field.column))
.map(field => PartitionerField(field.column, field.dataType, null))
val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
ifNotExists,
Some(identifier.getDatabaseName),
identifier.getTableName,
fields,
partitionerFields,
tblProperties,
bucketFields,
isAlterFlow = false,
table.comment
)
if(table.properties.contains("hasexternalkeyword")) {
isTransactionalTable = true
}
TableNewProcessor(tableModel)
} else {
throw new MalformedCarbonCommandException(
s"Invalid table path provided: ${ identifier.getTablePath } ")
}
}
}
// set "_external" property, so that DROP TABLE will not delete the data
if (provider.equalsIgnoreCase("'carbonfile'")) {
tableInfo.getFactTable.getTableProperties.put("_filelevelformat", "true")
tableInfo.getFactTable.getTableProperties.put("_external", "false")
} else if (!table.properties.contains("hasexternalkeyword")) {
tableInfo.getFactTable.getTableProperties.put("_external", "true")
tableInfo.getFactTable.getTableProperties.put("_filelevelformat", "false")
}
var isLocalDic_enabled = tableInfo.getFactTable.getTableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
if (null == isLocalDic_enabled) {
tableInfo.getFactTable.getTableProperties
.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_SYSTEM_ENABLE,
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT))
}
isLocalDic_enabled = tableInfo.getFactTable.getTableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
isLocalDic_enabled.toBoolean) {
val allColumns = tableInfo.getFactTable.getListOfColumns
for (i <- 0 until allColumns.size()) {
val cols = allColumns.get(i)
if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) {
cols.setLocalDictColumn(true)
}
}
tableInfo.getFactTable.setListOfColumns(allColumns)
}
tableInfo
} else {
val partitionerFields = fields
.filter(field => partitionColumnNames.contains(field.column))
.map(field => PartitionerField(field.column, field.dataType, null))
// prepare table model of the collected tokens
val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
ifNotExists,
Option(identifier.getDatabaseName),
identifier.getTableName,
fields,
partitionerFields,
tblProperties,
bucketFields,
isAlterFlow = false,
table.comment
)
TableNewProcessor(tableModel)
}
tableInfo.setTablePath(identifier.getTablePath)
tableInfo.setTransactionalTable(isTransactionalTable)
if (isTransactionalTable && isExternal) {
val tblStatusVersion = CarbonScalaUtil.getLatestTableStatusVersion(identifier.getTablePath)
if (tblStatusVersion.nonEmpty) {
tableInfo.getFactTable.getTableProperties.put("latestversion", tblStatusVersion)
}
}
tableInfo
}