override def processData()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala [126:309]


  override def processData(sparkSession: SparkSession): Seq[Row] = {
    if (isInsertIntoWithConverterFlow) {
      return CarbonInsertIntoWithDf(
        databaseNameOp = databaseNameOp,
        tableName = tableName,
        options = options,
        isOverwriteTable = isOverwriteTable,
        dataFrame = dataFrame,
        updateModel = None,
        tableInfoOp = Some(tableInfo),
        internalOptions = internalOptions,
        partition = partition).process(sparkSession)
    }
    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
    val hadoopConf = sparkSession.sessionState.newHadoopConf()
    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
    val factPath = ""
    currPartitions = CommonLoadUtils.getCurrentPartitions(sparkSession, table)
    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
    val optionsFinal: util.Map[String, String] = CommonLoadUtils.getFinalLoadOptions(table, options)
    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
      hadoopConf = hadoopConf,
      factPath = factPath,
      optionsFinal = optionsFinal,
      parentTablePath = parentTablePath,
      table = table,
      isDataFrame = true,
      internalOptions = internalOptions,
      partition = partition,
      options = options)
    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel)
    timeStampFormat = tf
    dateFormat = df
    val partitionInfo = tableInfo.getFactTable.getPartitionInfo
    val partitionColumnSchema =
      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
        partitionInfo.getColumnSchemaList.asScala
      } else {
        null
      }
    val convertedStaticPartition = getConvertedStaticPartitionMap(partitionColumnSchema)
    val (reArrangedIndex, reArrangedMVIndex, selectedColumnSchema) =
      getReArrangedIndexAndSelectedSchema(
      tableInfo,
      partitionColumnSchema,
      carbonLoadModel)
    val newLogicalPlan = getReArrangedLogicalPlan(
      reArrangedIndex,
      selectedColumnSchema,
      convertedStaticPartition)
    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
    if (logicalPartitionRelation != null) {
      if (selectedColumnSchema.length != logicalPartitionRelation.output.length) {
        throw new RuntimeException(" schema length doesn't match partition length")
      }
      val isNotReArranged = selectedColumnSchema.zipWithIndex.exists {
        case (cs, i) => !cs.getColumnName.equals(logicalPartitionRelation.output(i).name)
      }
      if (isNotReArranged) {
        // Re-arrange the catalog table schema and output for partition relation
        logicalPartitionRelation =
          if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV) {
            // Re-arrange non-partition columns in the catalog table schema based on rearranged
            // mv index order. Example: MV columns: c1,c2(partition_column),c3(sort_column),c4.
            // Based on this order, rearranged index will be like (2,0,3,1). Catalog table schema
            // order will be (c1,c3,c4,c2) where the partition column will be always at last. If we
            // rearrange the logical relation based on above order, catalog table schema will be
            // changed to (c4,c1,c2,c3), which will be wrong. Hence, Reorder MV create column
            // order to (c1,c3,c4,c2) and use rearranged mv index (1,0,2,3) to rearrange
            // logical relation schema.
            getReArrangedSchemaLogicalRelation(reArrangedMVIndex, logicalPartitionRelation)
          } else {
            getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
          }
      }
    }
    var isUpdateTableStatusRequired = false
    val uuid = ""
    var loadResultForReturn: LoadMetadataDetails = null
    var rowsForReturn: Seq[Row] = Seq.empty
    try {
      val (tableIndexes, indexOperationContext) =
        CommonLoadUtils.firePreLoadEvents(
          sparkSession = sparkSession,
          carbonLoadModel = carbonLoadModel,
          uuid = uuid,
          factPath = factPath,
          optionsFinal = optionsFinal,
          options = options.asJava,
          isOverwriteTable = isOverwriteTable,
          isDataFrame = true,
          updateModel = updateModel,
          operationContext = operationContext)

      // add the start entry for the new load in the table status file
      if ((updateModel.isEmpty || updateModel.isDefined)
          && !table.isHivePartitionTable) {
        if (updateModel.isDefined) {
          carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
        }
        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
          carbonLoadModel,
          isOverwriteTable)
        isUpdateTableStatusRequired = true
      } else if (!table.isHivePartitionTable) {
        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
          carbonLoadModel,
          isOverwriteTable)
        isUpdateTableStatusRequired = true
      }
      if (isUpdateTableStatusRequired) {
        CarbonHiveIndexMetadataUtil.updateTableStatusVersion(carbonLoadModel
          .getCarbonDataLoadSchema
          .getCarbonTable, sparkSession, carbonLoadModel.getLatestTableStatusWriteVersion)
      }
      if (isOverwriteTable) {
        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
      }
      // Create table and metadata folders if not exist
      if (carbonLoadModel.isCarbonTransactionalTable) {
        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
          FileFactory.mkdirs(metadataDirectoryPath)
        }
      } else {
        carbonLoadModel.setSegmentId(System.nanoTime().toString)
      }
      val partitionStatus = SegmentStatus.SUCCESS
      val loadParams = CarbonLoadParams(sparkSession,
        tableName,
        sizeInBytes,
        isOverwriteTable,
        carbonLoadModel,
        hadoopConf,
        logicalPartitionRelation,
        dateFormat,
        timeStampFormat,
        options,
        finalPartition,
        currPartitions,
        partitionStatus,
        None,
        Some(scanResultRdd),
        updateModel,
        operationContext)
      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
      val (rows, loadResult) = insertData(loadParams)
      loadResultForReturn = loadResult
      rowsForReturn = rows
      val info = CommonLoadUtils.makeAuditInfo(loadResult)
      setAuditInfo(info)
      CommonLoadUtils.firePostLoadEvents(sparkSession,
        carbonLoadModel,
        tableIndexes,
        indexOperationContext,
        table,
        operationContext)
    } catch {
      case CausedBy(ex: NoRetryException) =>
        // update the load entry in table status file for changing the status to marked for delete
        if (isUpdateTableStatusRequired) {
          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
        }
        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
      // In case of event related exception
      case preEventEx: PreEventException =>
        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
        throw new AnalysisException(preEventEx.getMessage)
      case ex: Exception =>
        LOGGER.error(ex)
        // update the load entry in table status file for changing the status to marked for delete
        if (isUpdateTableStatusRequired) {
          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
        }
        throw ex
    }
    if (loadResultForReturn != null && loadResultForReturn.getLoadName != null) {
      Seq(Row(loadResultForReturn.getLoadName))
    } else {
      // return the segment id in partition table case
      rowsForReturn
    }
  }