in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala [126:309]
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (isInsertIntoWithConverterFlow) {
return CarbonInsertIntoWithDf(
databaseNameOp = databaseNameOp,
tableName = tableName,
options = options,
isOverwriteTable = isOverwriteTable,
dataFrame = dataFrame,
updateModel = None,
tableInfoOp = Some(tableInfo),
internalOptions = internalOptions,
partition = partition).process(sparkSession)
}
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
val factPath = ""
currPartitions = CommonLoadUtils.getCurrentPartitions(sparkSession, table)
CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
val optionsFinal: util.Map[String, String] = CommonLoadUtils.getFinalLoadOptions(table, options)
val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
hadoopConf = hadoopConf,
factPath = factPath,
optionsFinal = optionsFinal,
parentTablePath = parentTablePath,
table = table,
isDataFrame = true,
internalOptions = internalOptions,
partition = partition,
options = options)
val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel)
timeStampFormat = tf
dateFormat = df
val partitionInfo = tableInfo.getFactTable.getPartitionInfo
val partitionColumnSchema =
if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
partitionInfo.getColumnSchemaList.asScala
} else {
null
}
val convertedStaticPartition = getConvertedStaticPartitionMap(partitionColumnSchema)
val (reArrangedIndex, reArrangedMVIndex, selectedColumnSchema) =
getReArrangedIndexAndSelectedSchema(
tableInfo,
partitionColumnSchema,
carbonLoadModel)
val newLogicalPlan = getReArrangedLogicalPlan(
reArrangedIndex,
selectedColumnSchema,
convertedStaticPartition)
scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
if (logicalPartitionRelation != null) {
if (selectedColumnSchema.length != logicalPartitionRelation.output.length) {
throw new RuntimeException(" schema length doesn't match partition length")
}
val isNotReArranged = selectedColumnSchema.zipWithIndex.exists {
case (cs, i) => !cs.getColumnName.equals(logicalPartitionRelation.output(i).name)
}
if (isNotReArranged) {
// Re-arrange the catalog table schema and output for partition relation
logicalPartitionRelation =
if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV) {
// Re-arrange non-partition columns in the catalog table schema based on rearranged
// mv index order. Example: MV columns: c1,c2(partition_column),c3(sort_column),c4.
// Based on this order, rearranged index will be like (2,0,3,1). Catalog table schema
// order will be (c1,c3,c4,c2) where the partition column will be always at last. If we
// rearrange the logical relation based on above order, catalog table schema will be
// changed to (c4,c1,c2,c3), which will be wrong. Hence, Reorder MV create column
// order to (c1,c3,c4,c2) and use rearranged mv index (1,0,2,3) to rearrange
// logical relation schema.
getReArrangedSchemaLogicalRelation(reArrangedMVIndex, logicalPartitionRelation)
} else {
getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
}
}
}
var isUpdateTableStatusRequired = false
val uuid = ""
var loadResultForReturn: LoadMetadataDetails = null
var rowsForReturn: Seq[Row] = Seq.empty
try {
val (tableIndexes, indexOperationContext) =
CommonLoadUtils.firePreLoadEvents(
sparkSession = sparkSession,
carbonLoadModel = carbonLoadModel,
uuid = uuid,
factPath = factPath,
optionsFinal = optionsFinal,
options = options.asJava,
isOverwriteTable = isOverwriteTable,
isDataFrame = true,
updateModel = updateModel,
operationContext = operationContext)
// add the start entry for the new load in the table status file
if ((updateModel.isEmpty || updateModel.isDefined)
&& !table.isHivePartitionTable) {
if (updateModel.isDefined) {
carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
}
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
carbonLoadModel,
isOverwriteTable)
isUpdateTableStatusRequired = true
} else if (!table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
carbonLoadModel,
isOverwriteTable)
isUpdateTableStatusRequired = true
}
if (isUpdateTableStatusRequired) {
CarbonHiveIndexMetadataUtil.updateTableStatusVersion(carbonLoadModel
.getCarbonDataLoadSchema
.getCarbonTable, sparkSession, carbonLoadModel.getLatestTableStatusWriteVersion)
}
if (isOverwriteTable) {
LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
}
// Create table and metadata folders if not exist
if (carbonLoadModel.isCarbonTransactionalTable) {
val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
if (!FileFactory.isFileExist(metadataDirectoryPath)) {
FileFactory.mkdirs(metadataDirectoryPath)
}
} else {
carbonLoadModel.setSegmentId(System.nanoTime().toString)
}
val partitionStatus = SegmentStatus.SUCCESS
val loadParams = CarbonLoadParams(sparkSession,
tableName,
sizeInBytes,
isOverwriteTable,
carbonLoadModel,
hadoopConf,
logicalPartitionRelation,
dateFormat,
timeStampFormat,
options,
finalPartition,
currPartitions,
partitionStatus,
None,
Some(scanResultRdd),
updateModel,
operationContext)
LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
val (rows, loadResult) = insertData(loadParams)
loadResultForReturn = loadResult
rowsForReturn = rows
val info = CommonLoadUtils.makeAuditInfo(loadResult)
setAuditInfo(info)
CommonLoadUtils.firePostLoadEvents(sparkSession,
carbonLoadModel,
tableIndexes,
indexOperationContext,
table,
operationContext)
} catch {
case CausedBy(ex: NoRetryException) =>
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
// In case of event related exception
case preEventEx: PreEventException =>
LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
throw new AnalysisException(preEventEx.getMessage)
case ex: Exception =>
LOGGER.error(ex)
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
throw ex
}
if (loadResultForReturn != null && loadResultForReturn.getLoadName != null) {
Seq(Row(loadResultForReturn.getLoadName))
} else {
// return the segment id in partition table case
rowsForReturn
}
}