in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala [47:196]
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = tableInfo.getFactTable.getTableName
var databaseOpt : Option[String] = None
ThreadLocalSessionInfo
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
if (tableInfo.getDatabaseName != null) {
databaseOpt = Some(tableInfo.getDatabaseName)
}
val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
setAuditTable(dbName, tableName)
setAuditInfo(tableInfo.getFactTable.getTableProperties.asScala.toMap
++ Map("external" -> isExternal.toString))
// set dbName and tableUnique Name in the table info
tableInfo.setDatabaseName(dbName)
tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
val isTransactionalTable = tableInfo.isTransactionalTable
if (sparkSession.sessionState.catalog
.tableExists(TableIdentifier(tableName, Some(dbName)))) {
if (!ifNotExistsSet) {
throw new TableAlreadyExistsException(dbName, tableName)
}
} else {
val tablePath = CarbonEnv.createTablePath(
Some(dbName),
tableName,
tableInfo.getFactTable.getTableId,
tableLocation,
isExternal,
isTransactionalTable
)(sparkSession)
tableInfo.setTablePath(tablePath)
CarbonSparkSqlParserUtil.validateTableProperties(tableInfo)
val tableIdentifier = AbsoluteTableIdentifier
.from(tablePath, dbName, tableName, tableInfo.getFactTable.getTableId)
val operationContext = new OperationContext
val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
CreateTablePreExecutionEvent(sparkSession, tableIdentifier, Some(tableInfo))
OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore
val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
if (createDSTable) {
try {
val tablePath = tableIdentifier.getTablePath
val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath)
val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
SparkUtil.setNullExecutionId(sparkSession)
val partitionInfo = tableInfo.getFactTable.getPartitionInfo
val partitionString =
if (partitionInfo != null &&
partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map(
_.getColumnName.toLowerCase).mkString(",")})"
} else {
""
}
// add carbon properties into option list in addition to carbon default properties
val repeatedPropKeys =
Seq("tablename",
"dbname",
"tablePath",
"isExternal",
"path",
"isTransactional",
"isVisible",
"carbonSchemaPartsNo")
val tableProperties =
tableInfo
.getFactTable
.getTableProperties
.asScala
.filter(prop => !repeatedPropKeys.exists(_.equalsIgnoreCase(prop._1)))
.map { property =>
s""" ${ property._1 } "${ property._2 }","""
}
.mkString("\n", "\n", "")
// synchronized to prevent concurrently creation of table with same name
CarbonCreateTableCommand.synchronized {
// isVisible property is added to hive table properties to differentiate between main
// table and mv. It is false only for mv's. This is added
// to improve the show tables performance when filtering the MV from main tables
sparkSession.sql(
s"""CREATE TABLE $dbName.$tableName
|(${ rawSchema })
|USING carbondata
|OPTIONS (${tableProperties}
| tableName "$tableName",
| dbName "$dbName",
| tablePath "$tablePath",
| path "${FileFactory.addSchemeIfNotExists(tablePath)}",
| isExternal "$isExternal",
| isTransactional "$isTransactionalTable",
| isVisible "$isVisible"
| $carbonSchemaString)
| $partitionString
""".stripMargin).collect()
}
} catch {
case e: AnalysisException =>
// AnalysisException thrown with table already exists message in case of
// concurrent drivers
if (e.getMessage().contains("already exists")) {
// Clear the cache first
CarbonEnv.getInstance(sparkSession).carbonMetaStore
.removeTableFromMetadata(dbName, tableName)
// Delete the folders created by this call if the actual path is different
val actualPath = CarbonEnv
.getCarbonTable(TableIdentifier(tableName, Option(dbName)))(sparkSession)
.getTablePath
if (!actualPath.equalsIgnoreCase(tablePath)) {
LOGGER
.error(
"TableAlreadyExists with path : " + actualPath + " So, deleting " + tablePath)
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(tablePath))
}
// No need to throw for create if not exists
if (ifNotExistsSet) {
LOGGER.error(e, e)
} else {
LOGGER.error(e)
throw e
}
} else {
LOGGER.error(e)
throw e
}
case e: Exception =>
// call the drop table to delete the created table.
try {
CarbonEnv.getInstance(sparkSession).carbonMetaStore
.dropTable(tableIdentifier)(sparkSession)
} catch {
case _: Exception => // No operation
}
throw e
}
}
val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
}
Seq.empty
}