in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala [85:507]
override def processData(sparkSession: SparkSession): Seq[Row] = {
val relations = CarbonSparkUtil.collectCarbonRelation(targetDsOri.logicalPlan)
val st = System.currentTimeMillis()
// if the input data is empty, return to avoid unnecessary operations. It can happen in
// streaming cases where new data is not pushed to streams.
if (srcDS.rdd.isEmpty()) {
return Seq()
}
val targetDsAliasName = targetDsOri.logicalPlan match {
case alias: SubqueryAlias =>
alias.alias
case _ => null
}
val sourceAliasName = srcDS.logicalPlan match {
case alias: SubqueryAlias =>
alias.alias
case _ => null
}
if (relations.length != 1) {
throw new UnsupportedOperationException(
"Carbon table supposed to be present in merge dataset")
}
val properties = CarbonProperties.getInstance()
if (operationType != null) {
val filterDupes = properties
.getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
val isSchemaEnforcementEnabled = properties
.getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
if (
!MergeOperationType.withName(operationType.toUpperCase).equals(MergeOperationType.INSERT) &&
filterDupes) {
throw new MalformedCarbonCommandException("property CARBON_STREAMER_INSERT_DEDUPLICATE" +
" should only be set with operation type INSERT")
}
if (isSchemaEnforcementEnabled) {
// call the util function to verify if incoming schema matches with target schema
CarbonMergeDataSetUtil.verifySourceAndTargetSchemas(targetDsOri, srcDS)
} else {
CarbonMergeDataSetUtil.handleSchemaEvolution(
targetDsOri, srcDS, sparkSession)
}
}
// Target dataset must be backed by carbondata table.
val tgtTable = relations.head.carbonRelation.carbonTable
val targetCarbonTable: CarbonTable = CarbonEnv.getCarbonTable(Option(tgtTable.getDatabaseName),
tgtTable.getTableName)(sparkSession)
// select only the required columns, it can avoid lot of and shuffling.
val targetDs = if (mergeMatches == null && operationType != null) {
targetDsOri.select(keyColumn)
} else {
// Get all the required columns of targetDS by going through all match conditions and actions.
val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession)
targetDsOri.select(columns: _*)
}
// decide join type based on match conditions or based on merge operation type
val joinType = if (mergeMatches == null && operationType != null) {
MergeOperationType.withName(operationType.toUpperCase) match {
case MergeOperationType.UPDATE | MergeOperationType.DELETE =>
"inner"
case MergeOperationType.UPSERT =>
"right_outer"
case MergeOperationType.INSERT =>
null
}
} else {
decideJoinType
}
val joinColumns = if (mergeMatches == null) {
Seq(keyColumn)
} else {
mergeMatches.joinExpr.expr.collect {
case unresolvedAttribute: UnresolvedAttribute if unresolvedAttribute.nameParts.nonEmpty =>
// Let's say the join condition will be something like A.id = B.id, then it will be an
// EqualTo expression, with left expression as UnresolvedAttribute(A.id) and right will
// be a Literal(B.id). Since we need the column name here, we can directly check the left
// which is UnresolvedAttribute. We take nameparts from UnresolvedAttribute which is an
// ArrayBuffer containing "A" and "id", since "id" is column name, we take
// nameparts.tail.head which gives us "id" column name.
unresolvedAttribute.nameParts.tail.head
}.distinct
}
// repartition the srsDs, if the target has bucketing and the bucketing columns contains join
// columns
val repartitionedSrcDs =
if (targetCarbonTable.getBucketingInfo != null &&
targetCarbonTable.getBucketingInfo
.getListOfColumns
.asScala
.map(_.getColumnName).containsSlice(joinColumns)) {
srcDS.repartition(targetCarbonTable.getBucketingInfo.getNumOfRanges,
joinColumns.map(srcDS.col): _*)
} else {
srcDS
}
// deduplicate the incoming dataset
// TODO: handle the case for partial updates
val orderingField = properties.getProperty(
CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD,
CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT)
val deduplicatedSrcDs = if (keyColumn != null) {
CarbonMergeDataSetUtil.deduplicateBeforeWriting(repartitionedSrcDs,
targetDs,
sparkSession,
sourceAliasName,
targetDsAliasName,
keyColumn,
orderingField,
targetCarbonTable)
} else {
repartitionedSrcDs
}
// cache the source data as we will be scanning multiple times
deduplicatedSrcDs.cache()
val deDuplicatedRecords = deduplicatedSrcDs.count()
LOGGER.info(s"Number of records from source data: $deDuplicatedRecords")
// Create accumulators to log the stats
val stats = Stats(createLongAccumulator("insertedRows"),
createLongAccumulator("updatedRows"),
createLongAccumulator("deletedRows"))
var finalCarbonFilesToScan: Array[String] = Array.empty[String]
// the pruning will happen when the join type is not full_outer, in case of full_outer,
// we will be needing all the records from left table which is target table, so no need to prune
// target table based on min max of source table.
val isMinMaxPruningEnabled = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_CDC_MINMAX_PRUNING_ENABLED,
CarbonCommonConstants.CARBON_CDC_MINMAX_PRUNING_ENABLED_DEFAULT).toBoolean
var didNotPrune = false
breakable {
if (isMinMaxPruningEnabled && joinType != null && !joinType.equalsIgnoreCase("full_outer")) {
// 1. get all the join columns of equal to conditions or equi joins
val targetKeyColumns = CarbonMergeDataSetUtil.getTargetTableKeyColumns(keyColumn,
targetDsAliasName,
targetCarbonTable,
mergeMatches)
val joinCarbonColumns = targetKeyColumns.collect {
case column => targetCarbonTable.getColumnByName(column)
}
LOGGER
.info(s"Key columns for join are: ${ joinCarbonColumns.map(_.getColName).mkString(",") }")
val columnToIndexMap: util.Map[String, Integer] = new util.LinkedHashMap[String, Integer]
// get the min max cache column and based on that determine the index to check in min-max
// array or Index Row
val minMaxColumns = targetCarbonTable.getMinMaxCachedColumnsInCreateOrder
if (minMaxColumns.size() != 0) {
if (minMaxColumns.size() ==
targetCarbonTable.getTableInfo.getFactTable.getListOfColumns.size() ||
minMaxColumns.size() == 1 && minMaxColumns.get(0).equalsIgnoreCase("All columns")) {
joinCarbonColumns.foreach { column =>
if (column.isDimension) {
columnToIndexMap.put(column.getColName, column.getOrdinal)
} else {
columnToIndexMap.put(column.getColName,
targetCarbonTable.getVisibleDimensions.size() + column.getOrdinal)
}
}
} else {
// handing case where only some columns are present as cached columns and check if those
// columns has the target key columns or join columns
val joinColumnsPresentInMinMaxCacheCols = joinCarbonColumns.map(_.getColName)
.intersect(minMaxColumns.asScala.toSet)
if (joinColumnsPresentInMinMaxCacheCols.isEmpty ||
joinColumnsPresentInMinMaxCacheCols.size == joinCarbonColumns.size) {
// 1. if none of the join columns are present in cache columns, then all blocklets
// will be selected, so pruning is not required
// 2. when one of the columns is not present in cache columns, no need to prune, as it
// may lead to wrong data due to different filter conditions like OR
didNotPrune = true
break()
}
}
}
// get the splits required, which will also load the cache based on the configuration either
// in index server or driver
val columnMinMaxInBlocklet: util.LinkedHashMap[String, util.List[FilePathMinMaxVO]] =
new util.LinkedHashMap[String, util.List[FilePathMinMaxVO]]
val colToSplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]] =
CarbonMergeDataSetUtil.getSplitsAndLoadToCache(targetCarbonTable,
deduplicatedSrcDs,
columnMinMaxInBlocklet,
columnToIndexMap,
sparkSession)
LOGGER.info("Finished getting splits from driver or index server")
// 2. get the tuple of filepath, min, max of the columns required, the min max should be
// converted to actual value based on the datatype logic collection to store only block and
// block level min and max
val fileMinMaxMapListOfAllJoinColumns: mutable.ArrayBuffer[(mutable.Map[String,
(AnyRef, AnyRef)], CarbonColumn)] =
mutable.ArrayBuffer.empty[(mutable.Map[String, (AnyRef, AnyRef)], CarbonColumn)]
val joinColumnsToComparatorMap:
mutable.LinkedHashMap[CarbonColumn, SerializableComparator] =
mutable.LinkedHashMap.empty[CarbonColumn, SerializableComparator]
joinCarbonColumns.map { joinColumn =>
val joinDataType = joinColumn.getDataType
val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) &&
(joinDataType != DataTypes.DATE)
val comparator = if (isPrimitiveAndNotDate) {
Comparator.getComparator(joinDataType)
} else if (joinDataType == DataTypes.STRING) {
null
} else {
Comparator.getComparatorByDataTypeForMeasure(joinDataType)
}
joinColumnsToComparatorMap += (joinColumn -> comparator)
}
// 3. prepare (filepath, (min, max)) at a block level.
CarbonMergeDataSetUtil.addFilePathAndMinMaxTuples(colToSplitsFilePathAndMinMaxMap,
targetCarbonTable,
joinColumnsToComparatorMap,
fileMinMaxMapListOfAllJoinColumns)
// 4. prepare mapping of column and a range tree based on filepath, min and max for that
// column. Here assumption is join expression columns will be less in actual use case.
// Basically a primary column
val joinColumnToTreeMapping: mutable.LinkedHashMap[CarbonColumn, BlockMinMaxTree] =
mutable.LinkedHashMap.empty[CarbonColumn, BlockMinMaxTree]
fileMinMaxMapListOfAllJoinColumns.foreach { case (fileMinMaxMap, joinCarbonColumn) =>
val joinDataType = joinCarbonColumn.getDataType
val isDimension = joinCarbonColumn.isDimension
val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) &&
(joinDataType != DataTypes.DATE)
val comparator = joinColumnsToComparatorMap(joinCarbonColumn)
val rangeIntervalTree = new BlockMinMaxTree(isPrimitiveAndNotDate,
isDimension, joinDataType, comparator)
fileMinMaxMap.foreach { case (filePath, minMax) =>
rangeIntervalTree.insert(new MinMaxNode(filePath, minMax._1, minMax._2))
}
joinColumnToTreeMapping += ((joinCarbonColumn, rangeIntervalTree))
}
// 5.from srcRDD, do map and then for each row search in min max tree prepared above and
// find the file paths to scan.
finalCarbonFilesToScan = CarbonMergeDataSetUtil.getFilesToScan(joinCarbonColumns,
joinColumnToTreeMapping,
deduplicatedSrcDs)
LOGGER.info(s"Finished min-max pruning. Carbondata files to scan during merge is: ${
finalCarbonFilesToScan.length}")
}
}
// check if its just upsert/update/delete/insert operation and go to UpsertHandler
if (mergeMatches == null && operationType != null) {
val isInsertOperation = operationType.equalsIgnoreCase(MergeOperationType.INSERT.toString)
val frame = if (isMinMaxPruningEnabled && !didNotPrune) {
// if min-max pruning is enabled then we need to add blockUDFs filter to scan only the
// pruned carbondata files from target carbon table.
if (!isInsertOperation) {
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
.where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')")
.join(deduplicatedSrcDs.select(keyColumn),
expr(s"$targetDsAliasName.$keyColumn = $sourceAliasName.$keyColumn"),
joinType)
} else {
null
}
} else {
if (!isInsertOperation) {
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
.join(deduplicatedSrcDs.select(keyColumn),
expr(s"$targetDsAliasName.$keyColumn = $sourceAliasName.$keyColumn"),
joinType)
} else {
null
}
}
val mergeHandler: MergeHandler =
MergeOperationType.withName(operationType.toUpperCase) match {
case MergeOperationType.UPSERT =>
UpsertHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
case MergeOperationType.UPDATE =>
UpdateHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
case MergeOperationType.DELETE =>
DeleteHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
case MergeOperationType.INSERT =>
InsertHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
}
// execute merge handler
mergeHandler.handleMerge()
LOGGER.info(
" Time taken to merge data :: " + (System.currentTimeMillis() - st))
// clear the cached src
deduplicatedSrcDs.unpersist()
return Seq()
}
// validate the merge matches and actions.
validateMergeActions(mergeMatches, targetDsOri, sparkSession)
val hasDelAction = mergeMatches.matchList
.exists(_.getActions.exists(_.isInstanceOf[DeleteAction]))
val hasUpdateAction = mergeMatches.matchList
.exists(_.getActions.exists(_.isInstanceOf[UpdateAction]))
val (insertHistOfUpdate, insertHistOfDelete) = getInsertHistoryStatus(mergeMatches)
// Update the update mapping with unfilled columns.From here on system assumes all mappings
// are existed.
mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs)
// Lets generate all conditions combinations as one column and add them as 'status'.
val condition = generateStatusColumnWithAllCombinations(mergeMatches)
// Add the getTupleId() udf to get the tuple id to generate delete delta.
val frame = if (isMinMaxPruningEnabled && !didNotPrune) {
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
.withColumn("exist_on_target", lit(1))
.where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')")
.join(deduplicatedSrcDs.withColumn("exist_on_src", lit(1)),
mergeMatches.joinExpr,
joinType)
.withColumn(status_on_mergeds, condition)
} else {
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
.withColumn("exist_on_target", lit(1))
.join(deduplicatedSrcDs.withColumn("exist_on_src", lit(1)),
mergeMatches.joinExpr,
joinType)
.withColumn(status_on_mergeds, condition)
}
if (LOGGER.isDebugEnabled) {
frame.explain()
}
val tableCols =
targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).
filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
val header = tableCols.mkString(",")
val frameWithoutStatusCol = frame.drop(status_on_mergeds)
val projections: Seq[Seq[MergeProjection]] = mergeMatches.matchList.map { m =>
m.getActions.map {
case u: UpdateAction => MergeProjection(tableCols,
frameWithoutStatusCol,
relations.head,
sparkSession,
u)
case i: InsertAction => MergeProjection(tableCols,
frameWithoutStatusCol,
relations.head,
sparkSession,
i)
case d: DeleteAction => MergeProjection(tableCols,
frameWithoutStatusCol,
relations.head,
sparkSession,
d)
case _ => null
}.filter(_ != null)
}
val targetSchema = StructType(tableCols.map { f =>
relations.head.carbonRelation.schema.find(_.name.equalsIgnoreCase(f)).get
} ++ Seq(StructField(status_on_mergeds, IntegerType)))
val (processedRDD, deltaPath) = processIUD(sparkSession, frame, targetCarbonTable, projections,
targetSchema, stats)
val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
val trxMgr = TranxManager(System.currentTimeMillis())
val mutationAction = MutationActionFactory.getMutationAction(sparkSession,
targetCarbonTable, hasDelAction, hasUpdateAction,
insertHistOfUpdate, insertHistOfDelete)
val loadDF = Dataset.ofRows(sparkSession,
LogicalRDD(targetSchema.toAttributes,
processedRDD)(sparkSession))
loadDF.cache()
val count = loadDF.count()
val updateTableModel = if (FileFactory.isFileExist(deltaPath)) {
val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, deltaPath)
val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable,
trxMgr.getLatestTrx, tuple)
Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx,
executorErrors, tuple._2, Option.empty))
} else {
None
}
val dataFrame = loadDF.select(tableCols.map(col): _*)
MergeUtil.insertDataToTargetTable(sparkSession,
targetCarbonTable,
header,
updateTableModel,
dataFrame)
if (hasDelAction && count == 0) {
MergeUtil.updateStatusIfJustDeleteOperation(targetCarbonTable, trxMgr.getLatestTrx)
}
LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
LOGGER.info(
" Time taken to merge data :: " + (System.currentTimeMillis() - st))
// Load the history table if the insert history table action is added by user.
HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head, targetCarbonTable,
trxMgr, mutationAction, mergeMatches)
// Do IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(
sparkSession, targetCarbonTable)
// clear the cached src
deduplicatedSrcDs.unpersist()
Seq.empty
}