in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/hive/CarbonInternalMetastore.scala [151:294]
def refreshIndexInfo(dbName: String, tableName: String,
carbonTable: CarbonTable, needLock: Boolean = true)(sparkSession: SparkSession): Unit = {
// check if secondary index table exists
val indexTableExists = CarbonIndexUtil.isIndexTableExists(carbonTable)
// check if cg and fg index exists
val indexExists = CarbonIndexUtil.isIndexExists(carbonTable)
// In case of non transactional table, no need to change the table schema.
if (!carbonTable.isTransactionalTable) {
return
}
// tables created without property "indexTableExists", will return null, for those tables enter
// into below block, gather the actual data from hive and then set this property to true/false
// then once the property has a value true/false, make decision based on the property value
if (null != carbonTable && (null == indexTableExists || indexTableExists.toBoolean)) {
// When Index information is not loaded in main table, then it will fetch
// index info from hive metastore and set it in the carbon table.
val indexTableMap =
new ConcurrentHashMap[String, java.util.Map[String, java.util.Map[String, String]]]
try {
val (isIndexTable, parentTableName, indexInfo, parentTablePath, parentTableId, schema) =
indexInfoFromHive(dbName, tableName)(sparkSession)
if (isIndexTable.equals("true")) {
val indexMeta = new IndexMetadata(indexTableMap,
parentTableName,
true,
parentTablePath,
parentTableId)
carbonTable.getTableInfo.getFactTable.getTableProperties
.put(carbonTable.getCarbonTableIdentifier.getTableId, indexMeta.serialize)
} else {
IndexTableInfo.fromGson(indexInfo)
.foreach { indexTableInfo =>
var indexProperties = indexTableInfo.getIndexProperties
val indexProvider = if (null != indexProperties) {
indexProperties.get(CarbonCommonConstants.INDEX_PROVIDER)
} else {
// in case if SI table has been created before the change CARBONDATA-3765,
// indexProperties variable will not be present. On direct upgrade of SI store,
// indexProperties will be null, in that case, create indexProperties from indexCols
indexProperties = new java.util.HashMap[String, String]()
indexProperties.put(CarbonCommonConstants.INDEX_COLUMNS,
indexTableInfo.getIndexCols.asScala.mkString(","))
val provider = IndexType.SI.getIndexProviderName
indexProperties.put(CarbonCommonConstants.INDEX_PROVIDER, provider)
provider
}
if (indexTableMap.isEmpty) {
val indexTableInfoMap = new java.util.HashMap[String, java.util.Map[String, String]]
indexTableInfoMap.put(indexTableInfo.getTableName, indexProperties)
indexTableMap.put(indexProvider, indexTableInfoMap)
} else if (null == indexTableMap.get(indexProvider)) {
val indexTableInfoMap = new java.util.HashMap[String, java.util.Map[String, String]]
indexTableInfoMap.put(indexTableInfo.getTableName, indexProperties)
indexTableMap.put(indexProvider, indexTableInfoMap)
} else {
indexTableMap.get(indexProvider).put(indexTableInfo.getTableName, indexProperties)
}
}
val indexMetadata = new IndexMetadata(
indexTableMap,
parentTableName,
isIndexTable.toBoolean,
parentTablePath, parentTableId)
carbonTable.getTableInfo.getFactTable.getTableProperties
.put(carbonTable.getCarbonTableIdentifier.getTableId, indexMetadata.serialize)
}
if (null == indexTableExists && !isIndexTable.equals("true")) {
val indexTables = CarbonIndexUtil.getSecondaryIndexes(carbonTable)
val tableIdentifier = new TableIdentifier(carbonTable.getTableName,
Some(carbonTable.getDatabaseName))
if (indexTables.isEmpty) {
// modify the tableProperties of mainTable by adding "indexTableExists" property
// to false as there is no index table for this table
if (indexExists == null) {
CarbonIndexUtil
.addOrModifyTableProperty(carbonTable,
Map("indexTableExists" -> "false", "indexExists" -> "false"), needLock)(
sparkSession)
} else {
CarbonIndexUtil
.addOrModifyTableProperty(carbonTable,
Map("indexTableExists" -> "false", "indexExists" -> "true"), needLock)(
sparkSession)
}
} else {
// modify the tableProperties of mainTable by adding "indexTableExists" property
// to true as there are some index table for this table
if (indexExists == null) {
CarbonIndexUtil
.addOrModifyTableProperty(carbonTable,
Map("indexTableExists" -> "true", "indexExists" -> "false"), needLock)(
sparkSession)
} else {
CarbonIndexUtil
.addOrModifyTableProperty(carbonTable,
Map("indexTableExists" -> "true", "indexExists" -> "true"), needLock)(
sparkSession)
}
}
}
} catch {
case e: Exception =>
// In case of creating a table, hive table will not be available.
LOGGER.error(e.getMessage)
}
}
// add cg and fg index info to table properties
if (null != indexExists) {
if (null != carbonTable && indexExists.toBoolean) {
val indexTableMap = new ConcurrentHashMap[String, java.util.Map[String, java.util
.Map[String, String]]]
val (isIndexTable, parentTableName, indexInfo, parentTablePath, parentTableId, schema) =
indexInfoFromHive(dbName, tableName)(sparkSession)
if (!indexInfo.equalsIgnoreCase("[]")) {
IndexTableInfo.fromGson(indexInfo)
.foreach { indexTableInfo =>
val indexProvider = indexTableInfo.getIndexProperties
.get(CarbonCommonConstants.INDEX_PROVIDER)
if (indexTableMap.isEmpty) {
val indexTableInfoMap = new java.util.HashMap[String, java.util.Map[String, String]]
indexTableInfoMap.put(indexTableInfo.getTableName,
indexTableInfo.getIndexProperties)
indexTableMap.put(indexProvider, indexTableInfoMap)
} else if (null == indexTableMap.get(indexProvider)) {
val indexTableInfoMap = new java.util.HashMap[String, java.util.Map[String, String]]
indexTableInfoMap.put(indexTableInfo.getTableName,
indexTableInfo.getIndexProperties)
indexTableMap.put(indexProvider, indexTableInfoMap)
} else {
indexTableMap.get(indexProvider).put(indexTableInfo.getTableName,
indexTableInfo.getIndexProperties)
}
}
val indexMetadata = new IndexMetadata(
indexTableMap,
parentTableName,
isIndexTable.toBoolean,
parentTablePath, parentTableId)
carbonTable.getTableInfo.getFactTable.getTableProperties
.put(carbonTable.getCarbonTableIdentifier.getTableId, indexMetadata.serialize)
}
}
}
}