in integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala [210:434]
def prepareTableModel(
ifNotExistPresent: Boolean,
dbName: Option[String],
tableName: String,
fields: Seq[Field],
partitionCols: Seq[PartitionerField],
tableProperties: Map[String, String],
bucketFields: Option[BucketFields],
isAlterFlow: Boolean = false,
tableComment: Option[String] = None): TableModel = {
// Process spatial index property
val indexFields = processSpatialIndexProperty(tableProperties, fields)
val allFields = indexFields ++ fields
// do not allow below key words as column name
validateColumnNames(allFields)
CommonUtil.validateForSpatialTypeColumn(tableProperties)
allFields.zipWithIndex.foreach { case (field, index) =>
field.schemaOrdinal = index
}
// If sort_scope is not no_sort && sort_columns specified by user is empty, then throw exception
if (tableProperties.get(CarbonCommonConstants.SORT_COLUMNS).isDefined
&& tableProperties(CarbonCommonConstants.SORT_COLUMNS).equalsIgnoreCase("") &&
tableProperties.get(CarbonCommonConstants.SORT_SCOPE).isDefined &&
!tableProperties(CarbonCommonConstants.SORT_SCOPE)
.equalsIgnoreCase(SortScope.NO_SORT.name())) {
throw new MalformedCarbonCommandException(
s"Cannot set SORT_COLUMNS as empty when SORT_SCOPE is ${
tableProperties(CarbonCommonConstants.SORT_SCOPE)
} ")
}
val (dims, msrs, noDictionaryDims, sortKeyDims, varcharColumns) = extractDimAndMsrFields(
fields, indexFields, tableProperties)
// column properties
val colProps = extractColumnProperties(fields, tableProperties)
// validate the local dictionary property if defined
if (tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined) {
if (!CarbonScalaUtil
.validateLocalDictionaryEnable(tableProperties(CarbonCommonConstants
.LOCAL_DICTIONARY_ENABLE))) {
tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
}
} else if (!isAlterFlow) {
// if LOCAL_DICTIONARY_ENABLE is not defined, try to get from system level property
tableProperties
.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_SYSTEM_ENABLE,
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT))
}
// validate the local dictionary threshold property if defined
if (tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD).isDefined) {
if (!CarbonScalaUtil
.validateLocalDictionaryThreshold(tableProperties(CarbonCommonConstants
.LOCAL_DICTIONARY_THRESHOLD))) {
LOGGER.debug(
"invalid value is configured for local_dictionary_threshold, considering the " +
"default value")
tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
}
}
// validate the local dictionary columns defined, this we will validated if the local dictionary
// is enabled, else it is not validated
if (tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined &&
tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).trim
.equalsIgnoreCase("true")) {
var localDictIncludeColumns: Seq[String] = Seq[String]()
var localDictExcludeColumns: Seq[String] = Seq[String]()
val isLocalDictIncludeDefined = tableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
.isDefined
val isLocalDictExcludeDefined = tableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
.isDefined
if (isLocalDictIncludeDefined) {
localDictIncludeColumns =
tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE).split(",").map(_.trim)
// validate all the local dictionary include columns
CarbonScalaUtil
.validateLocalConfiguredDictionaryColumns(fields,
tableProperties,
localDictIncludeColumns)
}
if (isLocalDictExcludeDefined) {
localDictExcludeColumns =
tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).split(",").map(_.trim)
// validate all the local dictionary exclude columns
CarbonScalaUtil
.validateLocalConfiguredDictionaryColumns(fields,
tableProperties,
localDictExcludeColumns)
}
// validate if both local dictionary include and exclude contains same column
CarbonScalaUtil.validateDuplicateColumnsForLocalDict(tableProperties)
}
// get no inverted index columns from table properties.
val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
// get inverted index columns from table properties
val invertedIdxCols = extractInvertedIndexColumns(fields, tableProperties)
// Validate if columns present in inverted index are part of sort columns.
if (invertedIdxCols.nonEmpty) {
invertedIdxCols.foreach { column =>
if (!sortKeyDims.contains(column)) {
val errMsg = "INVERTED_INDEX column: " + column + " should be present in SORT_COLUMNS"
throw new MalformedCarbonCommandException(errMsg)
}
}
}
// check for any duplicate columns in inverted and noinverted columns defined in tblproperties
if (invertedIdxCols.nonEmpty && noInvertedIdxCols.nonEmpty) {
invertedIdxCols.foreach { distCol =>
if (noInvertedIdxCols.exists(x => x.equalsIgnoreCase(distCol.trim))) {
val duplicateColumns = (invertedIdxCols ++ noInvertedIdxCols)
.diff((invertedIdxCols ++ noInvertedIdxCols).distinct).distinct
val errMsg = "Column ambiguity as duplicate column(s):" +
duplicateColumns.mkString(",") +
" is present in INVERTED_INDEX " +
"and NO_INVERTED_INDEX. Duplicate columns are not allowed."
throw new MalformedCarbonCommandException(errMsg)
}
}
}
if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
// validate the column_meta_cache option
val tableColumns = dims.view.filterNot(_.spatialIndex).map(x => x.name.get) ++
msrs.map(x => x.name.get)
CommonUtil.validateColumnMetaCacheFields(
dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
tableName,
tableColumns,
tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get,
tableProperties)
val columnsToBeCached = tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get
if (columnsToBeCached.nonEmpty) {
columnsToBeCached.split(",").foreach { column =>
val dimFieldToBeCached = dims.filter(x => x.name.get.equals(column))
// first element is taken as each column with have a unique name
// check for complex type column
if (dimFieldToBeCached.nonEmpty &&
isComplexType(dimFieldToBeCached(0).dataType.get)) {
val errorMessage =
s"$column is a complex type column and complex type is not allowed for " +
s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }"
throw new MalformedCarbonCommandException(errorMessage)
} else if (dimFieldToBeCached.nonEmpty && DataTypes.BINARY.getName
.equalsIgnoreCase(dimFieldToBeCached(0).dataType.get)) {
val errorMessage =
s"$column is a binary data type column and binary data type is not allowed for " +
s"the option(s): ${CarbonCommonConstants.COLUMN_META_CACHE}"
throw new MalformedCarbonCommandException(errorMessage)
}
}
}
}
// validate the cache level
if (tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) {
CommonUtil.validateCacheLevel(
tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).get,
tableProperties)
}
// long_string_columns columns cannot be in no_inverted_index columns
var longStringColumns = varcharColumns.map(_.toUpperCase)
var noInvColIntersectLongStrCols = longStringColumns
.intersect(noInvertedIdxCols.map(_.toUpperCase))
if (!noInvColIntersectLongStrCols.isEmpty) {
throw new MalformedCarbonCommandException(
s"Column(s): ${
noInvColIntersectLongStrCols.mkString(",")
} both in no_inverted_index and long_string_columns which is not allowed.")
}
// long_string_columns columns cannot be in partition columns
var partitionColIntersecLongStrCols = longStringColumns
.intersect(partitionCols.map(col => col.partitionColumn.toUpperCase))
if (!partitionColIntersecLongStrCols.isEmpty) {
throw new MalformedCarbonCommandException(
s"Column(s): ${
partitionColIntersecLongStrCols.mkString(",")
} both in partition and long_string_columns which is not allowed.")
}
// validate the block size and blocklet size, page size in table properties
CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKSIZE)
CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKLET_SIZE)
CommonUtil.validatePageSizeInmb(tableProperties, CarbonCommonConstants.TABLE_PAGE_SIZE_INMB)
// validate table level properties for compaction
CommonUtil.validateTableLevelCompactionProperties(tableProperties)
// validate flat folder property.
CommonUtil.validateFlatFolder(tableProperties)
// validate load_min_size_inmb property
CommonUtil.validateLoadMinSize(tableProperties,
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)
// validate cache expiration time
CommonUtil.validateCacheExpiration(tableProperties,
CarbonCommonConstants.INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS)
TableModel(
ifNotExistPresent,
dbName,
tableName,
tableProperties.toMap,
reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f)), varcharColumns),
msrs.map(f => normalizeType(f)),
Option(sortKeyDims),
Option(varcharColumns),
Option(noDictionaryDims),
Option(noInvertedIdxCols),
Option(invertedIdxCols),
Some(colProps),
bucketFields: Option[BucketFields],
getPartitionInfo(partitionCols),
tableComment)
}