in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala [156:366]
private def doCreate(session: SparkSession,
tableIdentifier: TableIdentifier,
viewManager: MVManagerInSpark,
viewCatalog: MVCatalogInSpark): MVSchema = {
var logicalPlan = MVHelper.dropDummyFunction(
MVQueryParser.getQueryPlan(queryString, session))
val relatedTables = getRelatedTables(logicalPlan)
val viewRefreshMode = if (checkIsQueryNeedFullRefresh(logicalPlan) ||
checkIsHasNonCarbonTable(relatedTables)) {
MVProperty.REFRESH_MODE_FULL
} else {
MVProperty.REFRESH_MODE_INCREMENTAL
}
var modifiedQueryString = queryString
if (viewRefreshMode.equalsIgnoreCase(MVProperty.REFRESH_MODE_INCREMENTAL) &&
checkIfAvgAggregatePresent(logicalPlan)) {
// Check if average aggregate is used and derive logical plan from modified query string.
modifiedQueryString = MVQueryParser.checkForAvgAndModifySql(queryString)
logicalPlan = MVHelper.dropDummyFunction(
MVQueryParser.getQueryPlan(modifiedQueryString, session))
}
// check if mv with same query already exists
val mvSchemaWrapper = viewCatalog.getMVWithSameQueryPresent(logicalPlan)
if (mvSchemaWrapper.nonEmpty) {
val mvWithSameQuery = mvSchemaWrapper.get.viewSchema.getIdentifier.getTableName
throw new MalformedMVCommandException(
s"MV with the name `$mvWithSameQuery` has been already created with the same query")
}
val modularPlan = checkQuery(logicalPlan)
val viewSchema = getOutputSchema(logicalPlan)
val relatedTableList = toCarbonTables(session, relatedTables)
val inputCols = logicalPlan.output.map(x =>
x.name
).toList
val relatedTableNames = new util.ArrayList[String](relatedTableList.size())
// Check if load is in progress in any of the parent table mapped to the indexSchema
relatedTableList.asScala.foreach {
table =>
val tableProperties = table.getTableInfo.getFactTable.getTableProperties.asScala
// validate for spatial index column
val spatialProperty = tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
if (spatialProperty.isDefined) {
val spatialColumn = spatialProperty.get.trim
if (inputCols.contains(spatialColumn)) {
val errorMessage =
s"$spatialColumn is a spatial index column and is not allowed for " +
s"the option(s): MATERIALIZED VIEW"
throw new MalformedCarbonCommandException(errorMessage)
}
}
if (!table.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException(
"Cannot create mv on non-transactional table")
}
if (table.isMV) {
throw new MalformedCarbonCommandException(
"Cannot create mv on mv table " + table.getTableUniqueName)
}
if (table.isStreamingSink) {
throw new MalformedCarbonCommandException(
"Cannot create mv on stream table " + table.getTableUniqueName)
}
if (SegmentStatusManager.isLoadInProgressInTable(table)) {
throw new UnsupportedOperationException(
"Cannot create mv when insert is in progress on table " + table.getTableUniqueName)
}
relatedTableNames.add(table.getTableName)
}
val viewRefreshTriggerMode = if (deferredRefresh) {
MVProperty.REFRESH_TRIGGER_MODE_ON_MANUAL
} else {
properties.getOrElse(MVProperty.REFRESH_TRIGGER_MODE,
MVProperty.REFRESH_TRIGGER_MODE_ON_COMMIT)
}
val viewProperties = mutable.Map[String, String]()
viewProperties.put(
CarbonCommonConstants.MV_RELATED_TABLES, relatedTableNames.asScala.mkString(","))
val (timeSeriesColumn, granularity) = checkTimeSeriesQuery(logicalPlan, viewRefreshTriggerMode)
val fieldsMap = MVHelper.getFieldsMapFromPlan(
new SQLBuilder(modularPlan).SQLizer.execute(modularPlan), getLogicalRelation(logicalPlan))
// If MV is mapped to single main table, then inherit table properties from main table,
// else, will use default table properties. If DMProperties contains table properties, then
// table properties of indexSchema table will be updated
if (relatedTableList.size() == 1 && CarbonSource.isCarbonDataSource(relatedTables.head)) {
inheritTablePropertiesFromRelatedTable(
relatedTableList.get(0),
fieldsMap,
viewSchema,
viewProperties)
if (granularity != null) {
val timeSeriesDataType = relatedTableList.get(0).getTableInfo.getFactTable
.getListOfColumns.asScala
.filter(column => column.getColumnName.equalsIgnoreCase(timeSeriesColumn))
.head.getDataType
if (timeSeriesDataType.equals(DataTypes.DATE) ||
timeSeriesDataType.equals(DataTypes.TIMESTAMP)) {
// if data type is of Date type, then check if given granularity is valid for date type
if (timeSeriesDataType.equals(DataTypes.DATE)) {
checkTimeSeriesGranularityForDate(granularity)
}
} else {
throw new MalformedCarbonCommandException(
"TimeSeries Column must be of TimeStamp or Date type")
}
}
}
properties.foreach(t => viewProperties.put(t._1, t._2))
// TODO mv table support partition
// Inherit partition from related table if mv is mapped to single related table
val viewPartitionerFields = if (relatedTableList.size() == 1) {
val relatedTablePartitionColumns =
if (properties.getOrElse("partitioning", "true").toBoolean &&
relatedTableList.get(0).isHivePartitionTable) {
relatedTableList.get(0).getPartitionInfo
.getColumnSchemaList.asScala.map(_.getColumnName)
} else {
Seq.empty
}
getViewPartitionerFields(relatedTablePartitionColumns, fieldsMap)
} else {
Seq.empty
}
val columnOrderMap = new java.util.HashMap[Integer, String]()
if (viewPartitionerFields.nonEmpty) {
viewSchema.zipWithIndex.foreach {
case (viewField, index) =>
columnOrderMap.put(index, viewField.column)
}
}
// prepare table model of the collected tokens
val viewTableModel: TableModel = CarbonParserUtil.prepareTableModel(
ifNotExistPresent = ifNotExistsSet,
CarbonParserUtil.convertDbNameToLowerCase(tableIdentifier.database),
tableIdentifier.table.toLowerCase,
viewSchema,
viewPartitionerFields,
viewProperties,
None,
isAlterFlow = false,
None)
val viewTablePath = if (viewProperties.contains("path")) {
viewProperties("path")
} else {
CarbonEnv.getTablePath(viewTableModel.databaseNameOp, viewTableModel.tableName)(session)
}
CarbonCreateTableCommand(TableNewProcessor(viewTableModel),
viewTableModel.ifNotExistsSet, Some(viewTablePath), isVisible = false).run(session)
// Build and create mv schema
// Map list of main table columns mapped to MV table and add to indexSchema
val relatedTableToColumnsMap = new java.util.HashMap[String, util.Set[String]]()
for (viewField <- fieldsMap.values) {
viewField.relatedFieldList.foreach {
relatedField =>
if (null == relatedTableToColumnsMap.get(relatedField.tableName)) {
val columns = new util.HashSet[String]()
columns.add(relatedField.fieldName.toLowerCase())
relatedTableToColumnsMap.put(relatedField.tableName, columns)
} else {
relatedTableToColumnsMap.get(relatedField.tableName)
.add(relatedField.fieldName.toLowerCase())
}
}
}
val relatedTableIds = relatedTables.map { table =>
val relatedTableId = new RelationIdentifier(table.database, table.identifier.table, "")
relatedTableId.setTablePath(FileFactory.getUpdatedFilePath(table.location.toString))
relatedTableId.setProvider(table.provider.get)
relatedTableId
}
val viewIdentifier = new RelationIdentifier(
tableIdentifier.database.get, tableIdentifier.table,
CarbonEnv.getCarbonTable(tableIdentifier)(session).getTableId)
viewIdentifier.setTablePath(viewTablePath)
val schema = new MVSchema(viewManager)
schema.setIdentifier(viewIdentifier)
schema.setProperties(mutable.Map[String, String](viewProperties.toSeq: _*).asJava)
schema.setRelatedTableColumnList(relatedTableToColumnsMap)
schema.setColumnsOrderMap(columnOrderMap)
schema.setRelatedTables(new util.ArrayList[RelationIdentifier](relatedTableIds.asJava))
schema.getProperties.put(MVProperty.REFRESH_MODE, viewRefreshMode)
schema.getProperties.put(MVProperty.REFRESH_TRIGGER_MODE, viewRefreshTriggerMode)
if (null != granularity && null != timeSeriesColumn) {
schema.setTimeSeries(true)
}
schema.setQuery(queryString)
if (!viewRefreshMode.equals(MVProperty.REFRESH_MODE_FULL)) {
schema.setModifiedQuery(modifiedQueryString)
}
try {
viewManager.createSchema(schema.getIdentifier.getDatabaseName, schema)
} catch {
case exception: Exception =>
val dropTableCommand = CarbonDropTableCommand(
ifExistsSet = true,
Option(schema.getIdentifier.getDatabaseName),
schema.getIdentifier.getTableName,
dropChildTable = true,
isInternalCall = true)
dropTableCommand.run(session)
throw exception
}
schema
}