def buildTableInfoFromCatalogTable()

in integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala [148:319]


  def buildTableInfoFromCatalogTable(
      table: CatalogTable,
      ifNotExists: Boolean,
      sparkSession: SparkSession,
      selectQuery: Option[LogicalPlan] = None): TableInfo = {
    val tableProperties = normalizeProperties(getProperties(table))
    val options = new CarbonOption(Map(tableProperties.toSeq: _*))
    // validate streaming property
    validateStreamingProperty(options)
    val parser = new CarbonSpark2SqlParser()
    val isExternal = table.tableType == CatalogTableType.EXTERNAL
    var fields = parser.getFields(table.schema.fields, isExternal)
    val provider = table.provider.get
    val partitionColumnNames = table.partitionColumnNames.map(_.toLowerCase)

    // validate for create table as select
    selectQuery match {
      case Some(q) =>
        // create table as select does not allow creation of partitioned table
        if (partitionColumnNames.nonEmpty) {
          throw new MalformedCarbonCommandException(
            "A Create Table As Select (CTAS) statement is not allowed to " +
            "create a partitioned table using Carbondata file formats.")
        }
        // create table as select does not allow to explicitly specify schema
        if (fields.nonEmpty) {
          throw new MalformedCarbonCommandException(
            "Schema can not be specified in a Create Table As Select (CTAS) statement")
        }
        // external table is not allow
        if (isExternal) {
          throw new MalformedCarbonCommandException(
            "Create external table as select is not allowed")
        }
        fields = parser
          .getFields(CarbonEnv.getInstance(sparkSession).carbonMetaStore
            .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
      case _ =>
      // ignore this case
    }

    if (partitionColumnNames.nonEmpty && options.isStreaming) {
      throw new MalformedCarbonCommandException(
        "Streaming is not allowed on partitioned table")
    }
    if (!isExternal && fields.isEmpty) {
      throw new MalformedCarbonCommandException(
        "Creating table without column(s) is not supported")
    }
    // filter out internally added external keyword property
    val newTableProperties = tableProperties.filterNot(_._1.equalsIgnoreCase("hasexternalkeyword"))
    if (isExternal && fields.isEmpty && newTableProperties.nonEmpty) {
      // as fields are always zero for external table, cannot validate table properties.
      throw new MalformedCarbonCommandException(
        "Table properties are not supported for external table")
    }

    // validate tblProperties
    val tblProperties = mutable.Map(tableProperties.toSeq: _*)
    val bucketFields =
      parser.getBucketFields(tblProperties, fields, options)
    var isTransactionalTable: Boolean = true
    // table must convert database name and table name to lower case
    val identifier = AbsoluteTableIdentifier.from(
      CarbonUtil.checkAndAppendFileSystemURIScheme(table.storage
        .locationUri
        .map(CatalogUtils.URIToString)
        .getOrElse("")),
      CarbonEnv.getDatabaseName(table.identifier.database)(sparkSession).toLowerCase(),
      table.identifier.table.toLowerCase()
    )
    val tableInfo = if (isExternal) {
      // read table info from schema file in the provided table path
      val tableInfo = {
        try {
          val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
          if (!FileFactory.isFileExist(schemaPath)) {
            if (provider.equalsIgnoreCase("'carbonfile'")) {
              SchemaReader.inferSchema(identifier, true)
            } else {
              isTransactionalTable = false
              SchemaReader.inferSchema(identifier, false)
            }
          } else {
            SchemaReader.getTableInfo(identifier)
          }
        } catch {
          case e: Throwable =>
            if (fields.nonEmpty) {
              val partitionerFields = fields
                .filter(field => partitionColumnNames.contains(field.column))
                .map(field => PartitionerField(field.column, field.dataType, null))
              val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
                ifNotExists,
                Some(identifier.getDatabaseName),
                identifier.getTableName,
                fields,
                partitionerFields,
                tblProperties,
                bucketFields,
                isAlterFlow = false,
                table.comment
              )
              if(table.properties.contains("hasexternalkeyword")) {
                isTransactionalTable = true
              }
              TableNewProcessor(tableModel)
            } else {
              throw new MalformedCarbonCommandException(
                s"Invalid table path provided: ${ identifier.getTablePath } ")
            }
        }
      }
      // set "_external" property, so that DROP TABLE will not delete the data
      if (provider.equalsIgnoreCase("'carbonfile'")) {
        tableInfo.getFactTable.getTableProperties.put("_filelevelformat", "true")
        tableInfo.getFactTable.getTableProperties.put("_external", "false")
      } else if (!table.properties.contains("hasexternalkeyword")) {
        tableInfo.getFactTable.getTableProperties.put("_external", "true")
        tableInfo.getFactTable.getTableProperties.put("_filelevelformat", "false")
      }
      var isLocalDic_enabled = tableInfo.getFactTable.getTableProperties
        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
      if (null == isLocalDic_enabled) {
        tableInfo.getFactTable.getTableProperties
          .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
            CarbonProperties.getInstance()
              .getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_SYSTEM_ENABLE,
                CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT))
      }
      isLocalDic_enabled = tableInfo.getFactTable.getTableProperties
        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
      if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
          isLocalDic_enabled.toBoolean) {
        val allColumns = tableInfo.getFactTable.getListOfColumns
        for (i <- 0 until allColumns.size()) {
          val cols = allColumns.get(i)
          if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) {
            cols.setLocalDictColumn(true)
          }
        }
        tableInfo.getFactTable.setListOfColumns(allColumns)
      }
      tableInfo
    } else {
      val partitionerFields = fields
        .filter(field => partitionColumnNames.contains(field.column))
        .map(field => PartitionerField(field.column, field.dataType, null))
      // prepare table model of the collected tokens
      val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
        ifNotExists,
        Option(identifier.getDatabaseName),
        identifier.getTableName,
        fields,
        partitionerFields,
        tblProperties,
        bucketFields,
        isAlterFlow = false,
        table.comment
      )
      TableNewProcessor(tableModel)
    }
    tableInfo.setTablePath(identifier.getTablePath)
    tableInfo.setTransactionalTable(isTransactionalTable)
    if (isTransactionalTable && isExternal) {
      val tblStatusVersion = CarbonScalaUtil.getLatestTableStatusVersion(identifier.getTablePath)
      if (tblStatusVersion.nonEmpty) {
        tableInfo.getFactTable.getTableProperties.put("latestversion", tblStatusVersion)
      }
    }
    tableInfo
  }