override def processMetadata()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala [47:196]


  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    val tableName = tableInfo.getFactTable.getTableName
    var databaseOpt : Option[String] = None
    ThreadLocalSessionInfo
      .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
    if (tableInfo.getDatabaseName != null) {
      databaseOpt = Some(tableInfo.getDatabaseName)
    }
    val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
    setAuditTable(dbName, tableName)
    setAuditInfo(tableInfo.getFactTable.getTableProperties.asScala.toMap
                 ++ Map("external" -> isExternal.toString))
    // set dbName and tableUnique Name in the table info
    tableInfo.setDatabaseName(dbName)
    tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
    val isTransactionalTable = tableInfo.isTransactionalTable
    if (sparkSession.sessionState.catalog
      .tableExists(TableIdentifier(tableName, Some(dbName)))) {
      if (!ifNotExistsSet) {
        throw new TableAlreadyExistsException(dbName, tableName)
      }
    } else {
      val tablePath = CarbonEnv.createTablePath(
        Some(dbName),
        tableName,
        tableInfo.getFactTable.getTableId,
        tableLocation,
        isExternal,
        isTransactionalTable
      )(sparkSession)
      tableInfo.setTablePath(tablePath)
      CarbonSparkSqlParserUtil.validateTableProperties(tableInfo)
      val tableIdentifier = AbsoluteTableIdentifier
        .from(tablePath, dbName, tableName, tableInfo.getFactTable.getTableId)
      val operationContext = new OperationContext
      val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
        CreateTablePreExecutionEvent(sparkSession, tableIdentifier, Some(tableInfo))
      OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore
      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
      if (createDSTable) {
        try {
          val tablePath = tableIdentifier.getTablePath
          val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath)
          val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
          SparkUtil.setNullExecutionId(sparkSession)
          val partitionInfo = tableInfo.getFactTable.getPartitionInfo
          val partitionString =
            if (partitionInfo != null &&
                partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
              s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map(
                _.getColumnName.toLowerCase).mkString(",")})"
            } else {
              ""
            }

          // add carbon properties into option list in addition to carbon default properties
          val repeatedPropKeys =
            Seq("tablename",
              "dbname",
              "tablePath",
              "isExternal",
              "path",
              "isTransactional",
              "isVisible",
              "carbonSchemaPartsNo")
          val tableProperties =
            tableInfo
              .getFactTable
              .getTableProperties
              .asScala
              .filter(prop => !repeatedPropKeys.exists(_.equalsIgnoreCase(prop._1)))
              .map { property =>
                s"""  ${ property._1 }  "${ property._2 }","""
              }
              .mkString("\n", "\n", "")

          // synchronized to prevent concurrently creation of table with same name
          CarbonCreateTableCommand.synchronized {
            // isVisible property is added to hive table properties to differentiate between main
            // table and mv. It is false only for mv's. This is added
            // to improve the show tables performance when filtering the MV from main tables
            sparkSession.sql(
              s"""CREATE TABLE $dbName.$tableName
                 |(${ rawSchema })
                 |USING carbondata
                 |OPTIONS (${tableProperties}
                 |  tableName "$tableName",
                 |  dbName "$dbName",
                 |  tablePath "$tablePath",
                 |  path "${FileFactory.addSchemeIfNotExists(tablePath)}",
                 |  isExternal "$isExternal",
                 |  isTransactional "$isTransactionalTable",
                 |  isVisible "$isVisible"
                 |  $carbonSchemaString)
                 |  $partitionString
             """.stripMargin).collect()
          }
        } catch {
          case e: AnalysisException =>
            // AnalysisException thrown with table already exists message in case of
            // concurrent drivers
            if (e.getMessage().contains("already exists")) {

              // Clear the cache first
              CarbonEnv.getInstance(sparkSession).carbonMetaStore
                .removeTableFromMetadata(dbName, tableName)

              // Delete the folders created by this call if the actual path is different
              val actualPath = CarbonEnv
                .getCarbonTable(TableIdentifier(tableName, Option(dbName)))(sparkSession)
                .getTablePath

              if (!actualPath.equalsIgnoreCase(tablePath)) {
                LOGGER
                  .error(
                    "TableAlreadyExists with path : " + actualPath + " So, deleting " + tablePath)
                FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(tablePath))
              }

              // No need to throw for create if not exists
              if (ifNotExistsSet) {
                LOGGER.error(e, e)
              } else {
                LOGGER.error(e)
                throw e
              }
            } else {
              LOGGER.error(e)
              throw e
            }

          case e: Exception =>
            // call the drop table to delete the created table.
            try {
              CarbonEnv.getInstance(sparkSession).carbonMetaStore
                .dropTable(tableIdentifier)(sparkSession)
            } catch {
              case _: Exception => // No operation
            }
            throw e
        }
      }
      val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
        CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
      OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
    }
    Seq.empty
  }