override def processMetadata()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala [43:205]


  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    val tableName = alterTableDropColumnModel.tableName
    val dbName = alterTableDropColumnModel.databaseName
      .getOrElse(sparkSession.catalog.currentDatabase)
    setAuditTable(dbName, tableName)
    setAuditInfo(Map("column" -> alterTableDropColumnModel.columns.mkString(", ")))
    var locks = List.empty[ICarbonLock]
    var timeStamp = 0L
    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
    // get the latest carbon table and check for column existence
    var carbonTable: CarbonTable = null
    try {
      locks = AlterTableUtil
        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
      if (carbonTable.isIndexTable) {
        throw new MalformedCarbonCommandException(
          "alter table drop column is not supported for index table")
      }
      if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_DROP,
          alterTableDropColumnModel.columns.asJava)) {
        throw new MalformedCarbonCommandException(
          "alter table drop column is not supported for index indexSchema")
      }
      // Do not allow spatial index source columns to be dropped.
      AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable,
        alterTableDropColumnModel.columns)
      val partitionInfo = carbonTable.getPartitionInfo()
      val tableColumns = carbonTable.getCreateOrderColumn().asScala
      if (partitionInfo != null) {
        val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
          .map(_.getColumnName)
        // check each column existence in the table
        val partitionColumns = alterTableDropColumnModel.columns.filter {
          tableColumn => partitionColumnSchemaList.contains(tableColumn)
        }
        if (partitionColumns.nonEmpty) {
          throwMetadataException(dbName, tableName, "Partition columns cannot be dropped: " +
                                                  s"$partitionColumns")
        }

        // this check is added because, when table have only two columns, one is partition and one
        // is non partition, then dropping one column means, having table with only partition
        // column, which is wrong
        if (tableColumns.filterNot(col => alterTableDropColumnModel.columns
          .contains(col.getColName)).map(_.getColName).equals(partitionColumnSchemaList)) {
          throw new MalformedCarbonCommandException(
            "alter table drop column is failed, cannot have the table with all columns as " +
            "partition columns")
        }
      }
      val bucketInfo = carbonTable.getBucketingInfo
      if (bucketInfo != null) {
        val bucketColumnSchemaList = bucketInfo.getListOfColumns.asScala
          .map(_.getColumnName)
        // check each column existence in the table
        val bucketColumns = alterTableDropColumnModel.columns.filter {
          tableColumn => bucketColumnSchemaList.contains(tableColumn)
        }
        if (bucketColumns.nonEmpty) {
          throwMetadataException(dbName, tableName, "Bucket columns cannot be dropped: " +
            s"$bucketColumns")
        }
      }

      var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
      .ColumnSchema]()
      // TODO: if deleted column list includes bucketed column throw an error
      alterTableDropColumnModel.columns.foreach { column =>
        var columnExist = false
        tableColumns.foreach { tableColumn =>
          // column should not be already deleted and should exist in the table
          if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
            if (tableColumn.isDimension) {
              if (tableColumn.getDataType == DataTypes.DATE) {
                dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
              }
            }
            columnExist = true
          }
        }
        if (!columnExist) {
          throwMetadataException(dbName, tableName,
            s"Column $column does not exists in the table $dbName.$tableName")
        }
      }

      val operationContext = new OperationContext
      // event will be fired before dropping the columns
      val alterTableDropColumnPreEvent: AlterTableDropColumnPreEvent = AlterTableDropColumnPreEvent(
        carbonTable,
        alterTableDropColumnModel,
        sparkSession)
      OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, operationContext)

      // read the latest schema file
      val tableInfo: org.apache.carbondata.format.TableInfo =
        metastore.getThriftTableInfo(carbonTable)
      // deletedColumnSchema contains parent column schema and child also in case of complex and
      // deletedTableColumns contains only parent columns in case of complex to add in
      // schemaEvolution entry
      var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
      var deletedTableColumns = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
      alterTableDropColumnModel.columns.foreach { column =>
        columnSchemaList.foreach { columnSchema =>
          if (!columnSchema.invisible) {
            if (column.equalsIgnoreCase(columnSchema.column_name)) {
              val columnSchemaCopy = columnSchema.deepCopy
              deletedTableColumns += columnSchemaCopy
              deletedColumnSchema += columnSchemaCopy
              columnSchema.invisible = true
            } else if (columnSchema.column_name.toLowerCase
              .startsWith(column + CarbonCommonConstants.POINT)) {
              // if the column to be dropped is of complex type then its children are prefixed
              // with -> parent_name + '.'
              deletedColumnSchema += columnSchema.deepCopy
              columnSchema.invisible = true
            }
          }
        }
      }
      // add deleted columns to schema evolution history and update the schema
      timeStamp = System.currentTimeMillis
      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
      schemaEvolutionEntry.setRemoved(deletedTableColumns.toList.asJava)
      val schemaConverter = new ThriftWrapperSchemaConverterImpl
      val delCols = deletedColumnSchema.map { deleteCols =>
        schemaConverter.fromExternalToWrapperColumnSchema(deleteCols)
      }
      val tableIdentifier = AlterTableUtil.updateSchemaInfo(
        carbonTable,
        schemaEvolutionEntry,
        tableInfo)(sparkSession)
      AlterTableUtil.deleteColsAndUpdateSchema(carbonTable,
        delCols, tableIdentifier, sparkSession)
      new MockClassForAlterRevertTests().mockForAlterAddColRevertTest()
      // TODO: 1. add check for deletion of index tables

      // event will be fired before dropping the columns
      val alterTableDropColumnPostEvent: AlterTableDropColumnPostEvent =
        AlterTableDropColumnPostEvent(
          carbonTable,
          alterTableDropColumnModel,
          sparkSession)
      OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPostEvent, operationContext)

      LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
    } catch {
      case e: Exception =>
        if (carbonTable != null) {
          AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
        }
        throwMetadataException(dbName, tableName,
          s"Alter table drop column operation failed: ${e.getMessage}")
    } finally {
      // release lock after command execution completion
      AlterTableUtil.releaseLocks(locks)
    }
    Seq.empty
  }