in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala [83:471]
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (isDeferredRefresh) {
throw new UnsupportedOperationException("DEFERRED REFRESH is not supported")
}
val databaseName = CarbonEnv.getDatabaseName(indexModel.dbName)(sparkSession)
indexModel.dbName = Some(databaseName)
val tableName = indexModel.tableName
val storePath = CarbonProperties.getStorePath
val dbLocation = CarbonEnv.getDatabaseLocation(databaseName, sparkSession)
val indexTableName = indexModel.indexName
val tablePath: String = if (isCreateSIndex) {
dbLocation + CarbonCommonConstants.FILE_SEPARATOR + indexTableName
} else {
tableProperties("tablePath")
}
setAuditTable(databaseName, indexTableName)
setAuditInfo(Map(
"Column names" -> indexModel.columnNames.toString(),
"Parent TableName" -> indexModel.tableName,
"SI Table Properties" -> tableProperties.toString()))
LOGGER.info(
s"Creating Index with Database name [$databaseName] and Index name [$indexTableName]")
val identifier = TableIdentifier(tableName, indexModel.dbName)
var carbonTable: CarbonTable = null
var locks: List[ICarbonLock] = List()
var oldIndexInfo = ""
try {
carbonTable = CarbonEnv.getCarbonTable(indexModel.dbName, tableName)(sparkSession)
if (carbonTable == null) {
throw new ErrorMessage(s"Parent Table $databaseName.$tableName is not found")
}
if (carbonTable != null &&
(carbonTable.isFileLevelFormat || !carbonTable.getTableInfo.isTransactionalTable)) {
throw new MalformedCarbonCommandException(
"Unsupported operation on non transactional table")
}
if (carbonTable.isStreamingSink) {
throw new ErrorMessage(
s"Parent Table ${ carbonTable.getDatabaseName }." +
s"${ carbonTable.getTableName }" +
s" is Streaming Table and Secondary index on Streaming table is not supported ")
}
if (carbonTable.isHivePartitionTable) {
val isPartitionColumn = indexModel.columnNames.exists {
siColumns => carbonTable.getTableInfo
.getFactTable
.getPartitionInfo
.getColumnSchemaList
.asScala
.exists(_.getColumnName.equalsIgnoreCase(siColumns))
}
if (isPartitionColumn) {
throw new UnsupportedOperationException(
"Secondary Index cannot be created on a partition column.")
}
}
locks = acquireLockForSecondaryIndexCreation(carbonTable.getAbsoluteTableIdentifier)
if (locks.isEmpty) {
throw new ErrorMessage(
s"Not able to acquire lock. Another Data Modification operation " +
s"is already in progress for either ${carbonTable.getDatabaseName}." +
s"${carbonTable.getTableName} or ${carbonTable.getDatabaseName} or " +
s"$indexTableName. Please try after some time")
}
// get carbon table again to reflect any changes during lock acquire.
carbonTable =
CarbonEnv.getInstance(sparkSession).carbonMetaStore
.lookupRelation(indexModel.dbName, tableName)(sparkSession)
.asInstanceOf[CarbonRelation].carbonTable
if (carbonTable == null) {
throw new ErrorMessage(s"Parent Table $databaseName.$tableName is not found")
}
// storePath = carbonTable.getTablePath
// check if index table being created is a stale index table for the same or other table
// in current database. Following cases are possible for checking stale scenarios
// Case1: table exists in hive but deleted in carbon
// Case2: table exists in carbon but deleted in hive
// Case3: table neither exists in hive nor in carbon but stale folders are present for the
// index table being created
val indexTables = CarbonIndexUtil.getSecondaryIndexes(carbonTable)
val indexTableExistsInCarbon = indexTables.asScala.contains(indexTableName)
val indexTableExistsInHive = sparkSession.sessionState.catalog
.tableExists(TableIdentifier(indexTableName, indexModel.dbName))
val isRegisterIndex = !isCreateSIndex
if (indexTableExistsInHive && isCreateSIndex) {
if (!ifNotExists) {
LOGGER.error(
s"Index creation with Database name [$databaseName] and index name " +
s"[$indexTableName] failed. " +
s"Index [$indexTableName] already exists under database [$databaseName]")
throw new ErrorMessage(
s"Index [$indexTableName] already exists under database [$databaseName]")
} else {
return Seq.empty
}
} else if (indexTableExistsInCarbon && !indexTableExistsInHive && isCreateSIndex) {
LOGGER.error(
s"Index with [$indexTableName] under database [$databaseName] is present in " +
s"stale state.")
throw new ErrorMessage(
s"Index with [$indexTableName] under database [$databaseName] is present in " +
s"stale state. Please use drop index if exists command to delete the index table")
} else if (!indexTableExistsInCarbon && !indexTableExistsInHive && isCreateSIndex) {
val indexTableStorePath = storePath + CarbonCommonConstants.FILE_SEPARATOR + databaseName +
CarbonCommonConstants.FILE_SEPARATOR + indexTableName
if (CarbonUtil.isFileExists(indexTableStorePath)) {
LOGGER.error(
s"Index with [$indexTableName] under database [$databaseName] is present in " +
s"stale state.")
throw new ErrorMessage(
s"Index with [$indexTableName] under database [$databaseName] is present in " +
s"stale state. Please use drop index if exists command to delete the index " +
s"table")
}
}
val dims = carbonTable.getVisibleDimensions.asScala
val msrs = carbonTable.getVisibleMeasures.asScala
.map(x => if (!x.isComplex) {
x.getColName
})
val dimNames = dims.map(x => if (DataTypes.isArrayType(x.getDataType) || !x.isComplex) {
x.getColName.toLowerCase()
})
val isMeasureColPresent = indexModel.columnNames.find(x => msrs.contains(x))
if (isMeasureColPresent.isDefined) {
throw new ErrorMessage(s"Secondary Index is not supported for measure column : ${
isMeasureColPresent
.get
}")
}
val properties = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
if (spatialProperty.isDefined) {
if (indexModel.columnNames.exists(x => x.equalsIgnoreCase(spatialProperty.get.trim))) {
throw new ErrorMessage(s"Secondary Index is not supported for Spatial index column:" +
s" ${ spatialProperty.get.trim }")
}
}
// No. of index table cols are more than parent table key cols
if (indexModel.columnNames.size > dims.size) {
throw new ErrorMessage(s"Number of columns in Index table cannot be more than " +
"number of key columns in Source table")
}
if (indexModel.columnNames.exists(x => !dimNames.contains(x))) {
if (isRegisterIndex) {
throw new ErrorMessage(s"Cannot Register Secondary index table $indexTableName, " +
s"as it has column(s) which does not exists in $tableName. " +
s"Try Drop and recreate SI.")
}
throw new ErrorMessage(
s"one or more specified index cols either does not exist or not a key column or complex" +
s" column in table $databaseName.$tableName")
}
// Check for duplicate column names while creating index table
indexModel.columnNames.groupBy(col => col).foreach(f => if (f._2.size > 1) {
throw new ErrorMessage(s"Duplicate column name found : ${ f._1 }")
})
// Should not allow to create index on an index table
val isIndexTable = carbonTable.isIndexTable
if (isIndexTable) {
throw new ErrorMessage(
s"Table [$tableName] under database [$databaseName] is already an index table")
}
val absoluteTableIdentifier = AbsoluteTableIdentifier.
from(tablePath, databaseName, indexTableName)
val indexTablePath = CarbonTablePath
.getMetadataPath(absoluteTableIdentifier.getTablePath)
val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath,
carbonTable.getTableStatusVersion)
var siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(indexTablePath)
if (isRegisterIndex) {
// check if SI segments are more than main table segments
CarbonInternalLoaderUtil
.checkMainTableSegEqualToSiSeg(mainTblLoadMetadataDetails,
siTblLoadMetadataDetails, isRegisterIndex)
// check if SI table has undergone any Update or delete operation, which can happen in
// case of compatibility scenario. IUD after Refresh SI and before register index
val updatedSegmentsCount = siTblLoadMetadataDetails.filter(loadMetaDetail =>
!loadMetaDetail.getUpdateStatusFileName.equals(""))
if (!updatedSegmentsCount.isEmpty) {
throw new ErrorMessage(s"Cannot Register Secondary index table $indexTableName" +
", as it has undergone update or delete operation. " +
"Try Drop and recreate SI.")
}
}
// creation of index on long string or binary columns are not supported
val errorMsg = "one or more index columns specified contains long string or binary column" +
s" in table $databaseName.$tableName. SI cannot be created on " +
s"long string or binary columns."
dims.filter(dimension => indexModel.columnNames
.contains(dimension.getColName))
.map(_.getDataType).foreach(dataType =>
if (dataType.equals(DataTypes.VARCHAR) || dataType.equals(DataTypes.BINARY)) {
throw new ErrorMessage(errorMsg)
})
// Check whether index table column order is same as another index table column order
oldIndexInfo = carbonTable.getIndexInfo
if (null == oldIndexInfo) {
oldIndexInfo = ""
}
val indexProperties = new util.HashMap[String, String]
val indexTableCols = indexModel.columnNames.asJava
indexProperties.put(CarbonCommonConstants.INDEX_COLUMNS, indexTableCols.asScala.mkString(","))
indexProperties.put(CarbonCommonConstants.INDEX_PROVIDER,
IndexType.SI.getIndexProviderName)
indexProperties.put(CarbonCommonConstants.INDEX_STATUS, IndexStatus.ENABLED.name())
val indexInfo = IndexTableUtil.checkAndAddIndexTable(
oldIndexInfo,
new IndexTableInfo(
databaseName, indexTableName,
indexProperties),
true)
var tableInfo: TableInfo = null
// if Register Index call then read schema file from the metastore
if (!isCreateSIndex && indexTableExistsInHive) {
tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
} else {
tableInfo = prepareTableInfo(
carbonTable, databaseName,
tableName, indexTableName, absoluteTableIdentifier)
}
if (isRegisterIndex && null != tableInfo.getFactTable.getSchemaEvolution &&
null != carbonTable.getTableInfo.getFactTable.getSchemaEvolution) {
// check if SI table has undergone any alter schema operation before registering it
val indexTableSchemaEvolutionEntryList = tableInfo
.getFactTable
.getSchemaEvolution
.getSchemaEvolutionEntryList
val mainTableSchemaEvolutionEntryList = carbonTable
.getTableInfo
.getFactTable
.getSchemaEvolution
.getSchemaEvolutionEntryList
if (indexTableSchemaEvolutionEntryList.size() > mainTableSchemaEvolutionEntryList.size()) {
val index = mainTableSchemaEvolutionEntryList.size()
for (i <- index until indexTableSchemaEvolutionEntryList.size()) {
val schemaEntry = indexTableSchemaEvolutionEntryList.get(i)
val isSITableRenamed =
(schemaEntry.getAdded == null && schemaEntry.getRemoved == null) ||
(schemaEntry.getAdded.isEmpty && schemaEntry.getRemoved.isEmpty)
if (!isSITableRenamed) {
throw new ErrorMessage(s"Cannot Register Secondary index table $indexTableName" +
", as it has undergone column schema addition or deletion. " +
"Try Drop and recreate SI.")
}
}
}
}
if (!isCreateSIndex && !indexTableExistsInHive) {
LOGGER.error(
s"Index registration with Database name [$databaseName] and index name " +
s"[$indexTableName] failed. " +
s"Index [$indexTableName] does not exists under database [$databaseName]")
throw new ErrorMessage(
s"Index [$indexTableName] does not exists under database [$databaseName]")
}
// Need to fill partitioner class when we support partition
val tableIdentifier = AbsoluteTableIdentifier
.from(tablePath, databaseName, indexTableName)
// Add Database to catalog and persist
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore
// val tablePath = tableIdentifier.getTablePath
val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
// set index information in index table
val indexTableMeta = new IndexMetadata(tableName, true, carbonTable.getTablePath)
tableInfo.getFactTable.getTableProperties
.put(tableInfo.getFactTable.getTableId, indexTableMeta.serialize)
// set index information in parent table
IndexTableUtil.addIndexInfoToParentTable(carbonTable,
IndexType.SI.getIndexProviderName,
indexTableName,
indexProperties)
val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath)
val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
val operationContext = new OperationContext
val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
CreateTablePreExecutionEvent(sparkSession, tableIdentifier, Option(tableInfo))
OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
// do not create index table for register table call
// only the alter the existing table to set index related info
if (isCreateSIndex) {
try {
sparkSession.sql(
s"""CREATE TABLE $databaseName.$indexTableName
|($rawSchema)
|USING carbondata OPTIONS (tableName "$indexTableName",
|dbName "$databaseName", tablePath "$tablePath", path "$tablePath",
|parentTablePath "${ carbonTable.getTablePath }", isIndexTable "true",
|parentTableId "${ carbonTable.getCarbonTableIdentifier.getTableId }",
|parentTableName "$tableName"$carbonSchemaString) """.stripMargin)
.collect()
} catch {
case e: IOException =>
if (FileFactory.isFileExist(tablePath)) {
val si_dir = FileFactory.getCarbonFile(tablePath)
CarbonUtil.deleteFoldersAndFilesSilent(si_dir)
}
throw e
}
} else {
sparkSession.sql(
s"""ALTER TABLE $databaseName.$indexTableName SET SERDEPROPERTIES (
'parentTableName'='$tableName', 'isIndexTable' = 'true', 'parentTablePath' =
'${carbonTable.getTablePath}',
'parentTableId' = '${carbonTable.getCarbonTableIdentifier.getTableId}')""")
.collect()
checkAndRemoveIndexTableExistsProperty(sparkSession, indexTableName)
// Refresh the index table
CarbonEnv
.getInstance(sparkSession)
.carbonMetaStore
.lookupRelation(indexModel.dbName, indexTableName)(sparkSession)
.asInstanceOf[CarbonRelation]
.carbonTable
}
CarbonIndexUtil.addIndexTableInfo(IndexType.SI.getIndexProviderName,
carbonTable,
indexTableName,
indexProperties)
CarbonHiveIndexMetadataUtil.refreshTable(databaseName, indexTableName, sparkSession)
sparkSession.sql(
s"""ALTER TABLE $databaseName.$tableName SET SERDEPROPERTIES ('indexInfo' =
|'$indexInfo')""".stripMargin).collect()
val tableIdent = TableIdentifier(tableName, Some(databaseName))
// modify the tableProperties of mainTable by adding "indexTableExists" property
CarbonIndexUtil
.addOrModifyTableProperty(
carbonTable,
Map("indexTableExists" -> "true"), needLock = false)(sparkSession)
CarbonHiveIndexMetadataUtil.refreshTable(databaseName, tableName, sparkSession)
// refresh the parent table relation
sparkSession.sessionState.catalog.refreshTable(identifier)
// load data for secondary index
if (isCreateSIndex) {
LoadDataForSecondaryIndex(indexModel).run(sparkSession)
}
val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
LOGGER.info(
s"Index created with Database name [$databaseName] and Index name [$indexTableName]")
} catch {
case err@(_: ErrorMessage | _: IndexTableExistException) =>
if (err.getMessage.contains("Index Table with selected columns already exist") &&
!isCreateSIndex) {
checkAndRemoveIndexTableExistsProperty(sparkSession, indexTableName)
LOGGER.warn(s"Table [$indexTableName] has been already registered as Secondary " +
s"Index table with table [$databaseName.${ indexModel.tableName }].")
} else {
sys.error(err.getMessage)
}
case ex@(_: IOException | _: ParseException) =>
LOGGER.error(s"Index creation with Database name [$databaseName] " +
s"and Index name [$indexTableName] is failed")
throw ex
case e: Exception =>
LOGGER.error(s"Index creation with Database name [$databaseName] " +
s"and Index name [$indexTableName] is Successful, But the data load to index" +
s" table is failed")
throw e
}
finally {
if (locks.nonEmpty) {
releaseLocks(locks)
}
}
Seq.empty
}