def prepareTableModel()

in integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala [210:434]


  def prepareTableModel(
      ifNotExistPresent: Boolean,
      dbName: Option[String],
      tableName: String,
      fields: Seq[Field],
      partitionCols: Seq[PartitionerField],
      tableProperties: Map[String, String],
      bucketFields: Option[BucketFields],
      isAlterFlow: Boolean = false,
      tableComment: Option[String] = None): TableModel = {

    // Process spatial index property
    val indexFields = processSpatialIndexProperty(tableProperties, fields)
    val allFields = indexFields ++ fields

    // do not allow below key words as column name
    validateColumnNames(allFields)
    CommonUtil.validateForSpatialTypeColumn(tableProperties)

    allFields.zipWithIndex.foreach { case (field, index) =>
      field.schemaOrdinal = index
    }

    // If sort_scope is not no_sort && sort_columns specified by user is empty, then throw exception
    if (tableProperties.get(CarbonCommonConstants.SORT_COLUMNS).isDefined
        && tableProperties(CarbonCommonConstants.SORT_COLUMNS).equalsIgnoreCase("") &&
        tableProperties.get(CarbonCommonConstants.SORT_SCOPE).isDefined &&
        !tableProperties(CarbonCommonConstants.SORT_SCOPE)
          .equalsIgnoreCase(SortScope.NO_SORT.name())) {
      throw new MalformedCarbonCommandException(
        s"Cannot set SORT_COLUMNS as empty when SORT_SCOPE is ${
          tableProperties(CarbonCommonConstants.SORT_SCOPE)
        } ")
    }
    val (dims, msrs, noDictionaryDims, sortKeyDims, varcharColumns) = extractDimAndMsrFields(
      fields, indexFields, tableProperties)

    // column properties
    val colProps = extractColumnProperties(fields, tableProperties)

    // validate the local dictionary property if defined
    if (tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined) {
      if (!CarbonScalaUtil
        .validateLocalDictionaryEnable(tableProperties(CarbonCommonConstants
          .LOCAL_DICTIONARY_ENABLE))) {
        tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
          CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
      }
    } else if (!isAlterFlow) {
      // if LOCAL_DICTIONARY_ENABLE is not defined, try to get from system level property
      tableProperties
        .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
          CarbonProperties.getInstance()
            .getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_SYSTEM_ENABLE,
              CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT))
    }

    // validate the local dictionary threshold property if defined
    if (tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD).isDefined) {
      if (!CarbonScalaUtil
        .validateLocalDictionaryThreshold(tableProperties(CarbonCommonConstants
          .LOCAL_DICTIONARY_THRESHOLD))) {
        LOGGER.debug(
          "invalid value is configured for local_dictionary_threshold, considering the " +
          "default value")
        tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
          CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
      }
    }

    // validate the local dictionary columns defined, this we will validated if the local dictionary
    // is enabled, else it is not validated
    if (tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined &&
        tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).trim
          .equalsIgnoreCase("true")) {
      var localDictIncludeColumns: Seq[String] = Seq[String]()
      var localDictExcludeColumns: Seq[String] = Seq[String]()
      val isLocalDictIncludeDefined = tableProperties
        .get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
        .isDefined
      val isLocalDictExcludeDefined = tableProperties
        .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
        .isDefined
      if (isLocalDictIncludeDefined) {
        localDictIncludeColumns =
          tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE).split(",").map(_.trim)
        // validate all the local dictionary include columns
        CarbonScalaUtil
          .validateLocalConfiguredDictionaryColumns(fields,
            tableProperties,
            localDictIncludeColumns)
      }
      if (isLocalDictExcludeDefined) {
        localDictExcludeColumns =
          tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).split(",").map(_.trim)
        // validate all the local dictionary exclude columns
        CarbonScalaUtil
          .validateLocalConfiguredDictionaryColumns(fields,
            tableProperties,
            localDictExcludeColumns)
      }

      // validate if both local dictionary include and exclude contains same column
      CarbonScalaUtil.validateDuplicateColumnsForLocalDict(tableProperties)
    }

    // get no inverted index columns from table properties.
    val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
    // get inverted index columns from table properties
    val invertedIdxCols = extractInvertedIndexColumns(fields, tableProperties)

    // Validate if columns present in inverted index are part of sort columns.
    if (invertedIdxCols.nonEmpty) {
      invertedIdxCols.foreach { column =>
        if (!sortKeyDims.contains(column)) {
          val errMsg = "INVERTED_INDEX column: " + column + " should be present in SORT_COLUMNS"
          throw new MalformedCarbonCommandException(errMsg)
        }
      }
    }

    // check for any duplicate columns in inverted and noinverted columns defined in tblproperties
    if (invertedIdxCols.nonEmpty && noInvertedIdxCols.nonEmpty) {
      invertedIdxCols.foreach { distCol =>
        if (noInvertedIdxCols.exists(x => x.equalsIgnoreCase(distCol.trim))) {
          val duplicateColumns = (invertedIdxCols ++ noInvertedIdxCols)
            .diff((invertedIdxCols ++ noInvertedIdxCols).distinct).distinct
          val errMsg = "Column ambiguity as duplicate column(s):" +
                       duplicateColumns.mkString(",") +
                       " is present in INVERTED_INDEX " +
                       "and NO_INVERTED_INDEX. Duplicate columns are not allowed."
          throw new MalformedCarbonCommandException(errMsg)
        }
      }
    }

    if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
      // validate the column_meta_cache option
      val tableColumns = dims.view.filterNot(_.spatialIndex).map(x => x.name.get) ++
                         msrs.map(x => x.name.get)
      CommonUtil.validateColumnMetaCacheFields(
        dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
        tableName,
        tableColumns,
        tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get,
        tableProperties)
      val columnsToBeCached = tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get
      if (columnsToBeCached.nonEmpty) {
        columnsToBeCached.split(",").foreach { column =>
          val dimFieldToBeCached = dims.filter(x => x.name.get.equals(column))
          // first element is taken as each column with have a unique name
          // check for complex type column
          if (dimFieldToBeCached.nonEmpty &&
              isComplexType(dimFieldToBeCached(0).dataType.get)) {
            val errorMessage =
              s"$column is a complex type column and complex type is not allowed for " +
              s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }"
            throw new MalformedCarbonCommandException(errorMessage)
          } else if (dimFieldToBeCached.nonEmpty && DataTypes.BINARY.getName
            .equalsIgnoreCase(dimFieldToBeCached(0).dataType.get)) {
            val errorMessage =
              s"$column is a binary data type column and binary data type is not allowed for " +
              s"the option(s): ${CarbonCommonConstants.COLUMN_META_CACHE}"
            throw new MalformedCarbonCommandException(errorMessage)
          }
        }
      }
    }
    // validate the cache level
    if (tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) {
      CommonUtil.validateCacheLevel(
        tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).get,
        tableProperties)
    }
    // long_string_columns columns cannot be in no_inverted_index columns
    var longStringColumns = varcharColumns.map(_.toUpperCase)
    var noInvColIntersectLongStrCols = longStringColumns
      .intersect(noInvertedIdxCols.map(_.toUpperCase))
    if (!noInvColIntersectLongStrCols.isEmpty) {
      throw new MalformedCarbonCommandException(
        s"Column(s): ${
          noInvColIntersectLongStrCols.mkString(",")
        } both in no_inverted_index and long_string_columns which is not allowed.")
    }
    // long_string_columns columns cannot be in partition columns
    var partitionColIntersecLongStrCols = longStringColumns
      .intersect(partitionCols.map(col => col.partitionColumn.toUpperCase))
    if (!partitionColIntersecLongStrCols.isEmpty) {
      throw new MalformedCarbonCommandException(
        s"Column(s): ${
          partitionColIntersecLongStrCols.mkString(",")
        } both in partition and long_string_columns which is not allowed.")
    }
    // validate the block size and blocklet size, page size in table properties
    CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKSIZE)
    CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKLET_SIZE)
    CommonUtil.validatePageSizeInmb(tableProperties, CarbonCommonConstants.TABLE_PAGE_SIZE_INMB)
    // validate table level properties for compaction
    CommonUtil.validateTableLevelCompactionProperties(tableProperties)
    // validate flat folder property.
    CommonUtil.validateFlatFolder(tableProperties)
    // validate load_min_size_inmb property
    CommonUtil.validateLoadMinSize(tableProperties,
      CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)
    // validate cache expiration time
    CommonUtil.validateCacheExpiration(tableProperties,
      CarbonCommonConstants.INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS)

    TableModel(
      ifNotExistPresent,
      dbName,
      tableName,
      tableProperties.toMap,
      reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f)), varcharColumns),
      msrs.map(f => normalizeType(f)),
      Option(sortKeyDims),
      Option(varcharColumns),
      Option(noDictionaryDims),
      Option(noInvertedIdxCols),
      Option(invertedIdxCols),
      Some(colProps),
      bucketFields: Option[BucketFields],
      getPartitionInfo(partitionCols),
      tableComment)
  }