in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala [76:220]
def dropIndex(isSecondaryIndex: Boolean, sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = CarbonEnv.getDatabaseName(dbNameOp)(sparkSession)
var tableIdentifierForAcquiringLock: AbsoluteTableIdentifier = null
val locksToBeAcquired = if (needLock) {
List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
} else {
List.empty
}
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore
// flag to check if folders and files can be successfully deleted
var isValidDeletion = false
val carbonLocks: scala.collection.mutable.ArrayBuffer[ICarbonLock] = ArrayBuffer[ICarbonLock]()
var carbonTable: Option[CarbonTable] = None
try {
if (isSecondaryIndex) {
LOGGER.info(s"dropping index table $indexName")
carbonTable =
try {
Some(CarbonEnv.getCarbonTable(Some(dbName), indexName)(sparkSession))
} catch {
case ex: NoSuchTableException =>
var isIndexTableExists = false
// even if the index table does not exists
// check if the parent table exists and remove the index table reference
// in case if the parent table hold the deleted index table reference
try {
val parentCarbonTable = getParentTableFromCatalog(sparkSession, dbName, catalog)
val indexTableList = CarbonIndexUtil.getSecondaryIndexes(parentCarbonTable.get)
if (!indexTableList.isEmpty) {
removeIndexInfoFromParentTable(sparkSession,
dbName,
locksToBeAcquired,
carbonLocks,
parentCarbonTable.get)
isIndexTableExists = true
}
} catch {
case ex: NoSuchTableException =>
if (!ifExistsSet) {
throw ex
}
case e: Exception =>
throw e
}
if (!ifExistsSet && !isIndexTableExists) {
throw new NoSuchIndexException(indexName)
}
None
}
if (carbonTable.isDefined) {
CarbonInternalMetastore.refreshIndexInfo(dbName, indexName, carbonTable.get)(sparkSession)
val isIndexTableBool = carbonTable.get.isIndexTable
val parentTableName = CarbonIndexUtil.getParentTableName(carbonTable.get)
var parentTable = CarbonEnv.getCarbonTable(Some(dbName), parentTableName)(sparkSession)
if (!isIndexTableBool) {
sys.error(s"Drop Index command is not permitted on table [$dbName.$indexName]")
} else if (isIndexTableBool &&
!parentTableName.equalsIgnoreCase(parentTableName)) {
throw new NoSuchIndexException(indexName)
} else {
if (isIndexTableBool) {
tableIdentifierForAcquiringLock = parentTable.getAbsoluteTableIdentifier
} else {
tableIdentifierForAcquiringLock = AbsoluteTableIdentifier
.from(carbonTable.get.getTablePath, dbName.toLowerCase, indexName.toLowerCase)
}
locksToBeAcquired foreach {
lock => {
carbonLocks += CarbonLockUtil.getLockObject(tableIdentifierForAcquiringLock, lock)
}
}
isValidDeletion = true
}
val tableIdentifier = TableIdentifier(indexName, Some(dbName))
// drop carbon table
val tablePath = carbonTable.get.getTablePath
CarbonInternalMetastore.dropIndexTable(tableIdentifier, carbonTable.get,
tablePath,
parentTable,
removeEntryFromParentTable = true)(sparkSession)
// take the refreshed table object after dropping and updating the index table
parentTable = getRefreshedParentTable(sparkSession, dbName)
val indexTables = CarbonIndexUtil.getSecondaryIndexes(parentTable)
// if all the indexes are dropped then the main table holds no index tables,
// so change the "indexTableExists" property to false, iff all the indexes are deleted
if (null == indexTables || indexTables.isEmpty) {
val tableIdentifier = TableIdentifier(parentTable.getTableName,
Some(parentTable.getDatabaseName))
// modify the tableProperties of mainTable by adding "indexTableExists" property
CarbonIndexUtil
.addOrModifyTableProperty(parentTable,
Map("indexTableExists" -> "false"), needLock = false)(sparkSession)
CarbonHiveIndexMetadataUtil.refreshTable(dbName, parentTableName, sparkSession)
}
}
} else {
// remove cg or fg index
var parentCarbonTable = getParentTableFromCatalog(sparkSession, dbName, catalog).get
removeIndexInfoFromParentTable(sparkSession,
dbName,
locksToBeAcquired,
carbonLocks,
parentCarbonTable)
parentCarbonTable = getRefreshedParentTable(sparkSession, dbName)
val indexMetadata = parentCarbonTable.getIndexMetadata
var hasCgFgIndexes = false
if (null != indexMetadata && null != indexMetadata.getIndexesMap) {
// check if any CG or FG index exists. If not exists,
// then set indexExists as false to return empty index list for next query.
hasCgFgIndexes =
indexMetadata.getIndexesMap.containsKey(IndexType.BLOOMFILTER.getIndexProviderName) ||
indexMetadata.getIndexesMap.containsKey(IndexType.LUCENE.getIndexProviderName)
}
if (!hasCgFgIndexes) {
CarbonIndexUtil
.addOrModifyTableProperty(parentCarbonTable,
Map("indexExists" -> "false"), needLock = false)(sparkSession)
CarbonHiveIndexMetadataUtil.refreshTable(dbName, parentTableName, sparkSession)
}
}
} finally {
if (carbonLocks.nonEmpty) {
val unlocked = carbonLocks.forall(_.unlock())
if (unlocked) {
logInfo("Table MetaData Unlocked Successfully")
if (isValidDeletion) {
if (carbonTable != null && carbonTable.isDefined) {
CarbonInternalMetastore.deleteTableDirectory(carbonTable.get)
}
}
} else {
logError("Table metadata unlocking is unsuccessful, index table may be in stale state")
}
}
}
Seq.empty
}