private def doCreate()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala [156:366]


  private def doCreate(session: SparkSession,
      tableIdentifier: TableIdentifier,
      viewManager: MVManagerInSpark,
      viewCatalog: MVCatalogInSpark): MVSchema = {
    var logicalPlan = MVHelper.dropDummyFunction(
      MVQueryParser.getQueryPlan(queryString, session))
    val relatedTables = getRelatedTables(logicalPlan)
    val viewRefreshMode = if (checkIsQueryNeedFullRefresh(logicalPlan) ||
                              checkIsHasNonCarbonTable(relatedTables)) {
      MVProperty.REFRESH_MODE_FULL
    } else {
      MVProperty.REFRESH_MODE_INCREMENTAL
    }
    var modifiedQueryString = queryString
    if (viewRefreshMode.equalsIgnoreCase(MVProperty.REFRESH_MODE_INCREMENTAL) &&
      checkIfAvgAggregatePresent(logicalPlan)) {
      // Check if average aggregate is used and derive logical plan from modified query string.
      modifiedQueryString = MVQueryParser.checkForAvgAndModifySql(queryString)
      logicalPlan = MVHelper.dropDummyFunction(
        MVQueryParser.getQueryPlan(modifiedQueryString, session))
    }
    // check if mv with same query already exists
    val mvSchemaWrapper = viewCatalog.getMVWithSameQueryPresent(logicalPlan)
    if (mvSchemaWrapper.nonEmpty) {
      val mvWithSameQuery = mvSchemaWrapper.get.viewSchema.getIdentifier.getTableName
      throw new MalformedMVCommandException(
        s"MV with the name `$mvWithSameQuery` has been already created with the same query")
    }
    val modularPlan = checkQuery(logicalPlan)
    val viewSchema = getOutputSchema(logicalPlan)
    val relatedTableList = toCarbonTables(session, relatedTables)
    val inputCols = logicalPlan.output.map(x =>
      x.name
    ).toList
    val relatedTableNames = new util.ArrayList[String](relatedTableList.size())
    // Check if load is in progress in any of the parent table mapped to the indexSchema
    relatedTableList.asScala.foreach {
      table =>
        val tableProperties = table.getTableInfo.getFactTable.getTableProperties.asScala
        // validate for spatial index column
        val spatialProperty = tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
        if (spatialProperty.isDefined) {
          val spatialColumn = spatialProperty.get.trim
          if (inputCols.contains(spatialColumn)) {
            val errorMessage =
              s"$spatialColumn is a spatial index column and is not allowed for " +
              s"the option(s): MATERIALIZED VIEW"
            throw new MalformedCarbonCommandException(errorMessage)
          }
        }
        if (!table.getTableInfo.isTransactionalTable) {
          throw new MalformedCarbonCommandException(
            "Cannot create mv on non-transactional table")
        }
        if (table.isMV) {
          throw new MalformedCarbonCommandException(
            "Cannot create mv on mv table " + table.getTableUniqueName)
        }
        if (table.isStreamingSink) {
          throw new MalformedCarbonCommandException(
            "Cannot create mv on stream table " + table.getTableUniqueName)
        }
        if (SegmentStatusManager.isLoadInProgressInTable(table)) {
          throw new UnsupportedOperationException(
            "Cannot create mv when insert is in progress on table " + table.getTableUniqueName)
        }
        relatedTableNames.add(table.getTableName)
    }
    val viewRefreshTriggerMode = if (deferredRefresh) {
      MVProperty.REFRESH_TRIGGER_MODE_ON_MANUAL
    } else {
      properties.getOrElse(MVProperty.REFRESH_TRIGGER_MODE,
        MVProperty.REFRESH_TRIGGER_MODE_ON_COMMIT)
    }

    val viewProperties = mutable.Map[String, String]()
    viewProperties.put(
      CarbonCommonConstants.MV_RELATED_TABLES, relatedTableNames.asScala.mkString(","))

    val (timeSeriesColumn, granularity) = checkTimeSeriesQuery(logicalPlan, viewRefreshTriggerMode)

    val fieldsMap = MVHelper.getFieldsMapFromPlan(
      new SQLBuilder(modularPlan).SQLizer.execute(modularPlan), getLogicalRelation(logicalPlan))
    // If MV is mapped to single main table, then inherit table properties from main table,
    // else, will use default table properties. If DMProperties contains table properties, then
    // table properties of indexSchema table will be updated
    if (relatedTableList.size() == 1 && CarbonSource.isCarbonDataSource(relatedTables.head)) {
      inheritTablePropertiesFromRelatedTable(
        relatedTableList.get(0),
        fieldsMap,
        viewSchema,
        viewProperties)
      if (granularity != null) {
        val timeSeriesDataType = relatedTableList.get(0).getTableInfo.getFactTable
          .getListOfColumns.asScala
          .filter(column => column.getColumnName.equalsIgnoreCase(timeSeriesColumn))
          .head.getDataType
        if (timeSeriesDataType.equals(DataTypes.DATE) ||
          timeSeriesDataType.equals(DataTypes.TIMESTAMP)) {
          // if data type is of Date type, then check if given granularity is valid for date type
          if (timeSeriesDataType.equals(DataTypes.DATE)) {
            checkTimeSeriesGranularityForDate(granularity)
          }
        } else {
          throw new MalformedCarbonCommandException(
            "TimeSeries Column must be of TimeStamp or Date type")
        }
      }
    }
    properties.foreach(t => viewProperties.put(t._1, t._2))

    // TODO mv table support partition
    // Inherit partition from related table if mv is mapped to single related table
    val viewPartitionerFields = if (relatedTableList.size() == 1) {
      val relatedTablePartitionColumns =
        if (properties.getOrElse("partitioning", "true").toBoolean &&
          relatedTableList.get(0).isHivePartitionTable) {
          relatedTableList.get(0).getPartitionInfo
            .getColumnSchemaList.asScala.map(_.getColumnName)
        } else {
          Seq.empty
        }
      getViewPartitionerFields(relatedTablePartitionColumns, fieldsMap)
    } else {
      Seq.empty
    }

    val columnOrderMap = new java.util.HashMap[Integer, String]()
    if (viewPartitionerFields.nonEmpty) {
      viewSchema.zipWithIndex.foreach {
        case (viewField, index) =>
          columnOrderMap.put(index, viewField.column)
      }
    }

    // prepare table model of the collected tokens
    val viewTableModel: TableModel = CarbonParserUtil.prepareTableModel(
      ifNotExistPresent = ifNotExistsSet,
      CarbonParserUtil.convertDbNameToLowerCase(tableIdentifier.database),
      tableIdentifier.table.toLowerCase,
      viewSchema,
      viewPartitionerFields,
      viewProperties,
      None,
      isAlterFlow = false,
      None)

    val viewTablePath = if (viewProperties.contains("path")) {
      viewProperties("path")
    } else {
      CarbonEnv.getTablePath(viewTableModel.databaseNameOp, viewTableModel.tableName)(session)
    }
    CarbonCreateTableCommand(TableNewProcessor(viewTableModel),
      viewTableModel.ifNotExistsSet, Some(viewTablePath), isVisible = false).run(session)

    // Build and create mv schema
    // Map list of main table columns mapped to MV table and add to indexSchema
    val relatedTableToColumnsMap = new java.util.HashMap[String, util.Set[String]]()
    for (viewField <- fieldsMap.values) {
      viewField.relatedFieldList.foreach {
        relatedField =>
          if (null == relatedTableToColumnsMap.get(relatedField.tableName)) {
            val columns = new util.HashSet[String]()
            columns.add(relatedField.fieldName.toLowerCase())
            relatedTableToColumnsMap.put(relatedField.tableName, columns)
          } else {
            relatedTableToColumnsMap.get(relatedField.tableName)
              .add(relatedField.fieldName.toLowerCase())
          }
      }
    }
    val relatedTableIds = relatedTables.map { table =>
      val relatedTableId = new RelationIdentifier(table.database, table.identifier.table, "")
      relatedTableId.setTablePath(FileFactory.getUpdatedFilePath(table.location.toString))
      relatedTableId.setProvider(table.provider.get)
      relatedTableId
    }
    val viewIdentifier = new RelationIdentifier(
        tableIdentifier.database.get, tableIdentifier.table,
        CarbonEnv.getCarbonTable(tableIdentifier)(session).getTableId)
    viewIdentifier.setTablePath(viewTablePath)
    val schema = new MVSchema(viewManager)
    schema.setIdentifier(viewIdentifier)
    schema.setProperties(mutable.Map[String, String](viewProperties.toSeq: _*).asJava)
    schema.setRelatedTableColumnList(relatedTableToColumnsMap)
    schema.setColumnsOrderMap(columnOrderMap)
    schema.setRelatedTables(new util.ArrayList[RelationIdentifier](relatedTableIds.asJava))
    schema.getProperties.put(MVProperty.REFRESH_MODE, viewRefreshMode)
    schema.getProperties.put(MVProperty.REFRESH_TRIGGER_MODE, viewRefreshTriggerMode)
    if (null != granularity && null != timeSeriesColumn) {
      schema.setTimeSeries(true)
    }
    schema.setQuery(queryString)
    if (!viewRefreshMode.equals(MVProperty.REFRESH_MODE_FULL)) {
      schema.setModifiedQuery(modifiedQueryString)
    }
    try {
      viewManager.createSchema(schema.getIdentifier.getDatabaseName, schema)
    } catch {
      case exception: Exception =>
        val dropTableCommand = CarbonDropTableCommand(
          ifExistsSet = true,
          Option(schema.getIdentifier.getDatabaseName),
          schema.getIdentifier.getTableName,
          dropChildTable = true,
          isInternalCall = true)
        dropTableCommand.run(session)
        throw exception
    }
    schema
  }