in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala [94:355]
def deleteDeltaExecutionInternal(
databaseNameOp: Option[String],
tableName: String,
sparkSession: SparkSession,
dataRdd: RDD[Row],
timestamp: String,
isUpdateOperation: Boolean,
executorErrors: ExecutionErrors,
tupleId: Option[Int] = None):
(Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]], BlockMappingVO) = {
var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]] = null
val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val tablePath = absoluteTableIdentifier.getTablePath
val deleteRdd = if (isUpdateOperation) {
val schema =
org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
org.apache.spark.sql.types.StringType)))
val rdd = tupleId match {
case Some(id) =>
dataRdd
.map(row => Row(row.get(id)))
case _ =>
dataRdd
.map(row => Row(row.get(row.fieldIndex(
CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
}
sparkSession.createDataFrame(rdd, schema).rdd
} else {
dataRdd
}
val (carbonInputFormat, job) = createCarbonInputFormat(absoluteTableIdentifier)
CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
val keyRdd = tupleId match {
case Some(id) =>
deleteRdd.map { row =>
val tupleId: String = row.getString(id)
val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId,
carbonTable.isHivePartitionTable)
(key, row)
}.groupByKey()
case _ =>
deleteRdd.map { row =>
val tupleId: String = row
.getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId,
carbonTable.isHivePartitionTable)
(key, row)
}.groupByKey()
}
// if no loads are present then no need to do anything.
if (keyRdd.partitions.length == 0) {
return (Array.empty[List[(SegmentStatus,
(SegmentUpdateDetails, ExecutionErrors, Long))]], null)
}
val blockMappingVO =
carbonInputFormat.getBlockRowCount(
job,
carbonTable,
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull, true)
val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable)
CarbonUpdateUtil
.createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
val metadataDetails = SegmentStatusManager.readTableStatusFile(
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath,
carbonTable.getTableStatusVersion))
val isStandardTable = CarbonUtil.isStandardCarbonTable(carbonTable)
val rowContRdd =
sparkSession.sparkContext.parallelize(
blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
keyRdd.partitions.length)
val conf = SparkSQLUtil
.broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
val rdd = rowContRdd.join(keyRdd)
val blockDetails = blockMappingVO.getBlockToSegmentMapping
res = rdd.mapPartitionsWithIndex(
(index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]] {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]()
while (records.hasNext) {
val ((key), (rowCountDetailsVO, groupedRows)) = records.next
result = result ++
deleteDeltaFunc(index,
key,
groupedRows.toIterator,
timestamp,
rowCountDetailsVO,
isStandardTable,
metadataDetails
.find(_.getLoadName.equalsIgnoreCase(blockDetails.get(key)))
.get, carbonTable)
}
result
}).collect()
def deleteDeltaFunc(index: Int,
key: String,
iter: Iterator[Row],
timestamp: String,
rowCountDetailsVO: RowCountDetailsVO,
isStandardTable: Boolean,
load: LoadMetadataDetails, carbonTable: CarbonTable
): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))] = {
val result = new DeleteDeltaResultImpl()
var deleteStatus = SegmentStatus.LOAD_FAILURE
lazy val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
// here key = segment/blockName
var blockName = if (carbonTable.isHivePartitionTable) {
key
} else {
key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)
}
blockName = blockName.replace(CarbonCommonConstants.UNDERSCORE, CarbonTablePath.BATCH_PREFIX)
blockName = CarbonUpdateUtil.getBlockName(CarbonTablePath.addDataPartPrefix(blockName))
val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName)
val resultIter =
new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))] {
val segmentUpdateDetails = new SegmentUpdateDetails()
var TID = ""
var countOfRows = 0
try {
while (iter.hasNext) {
val oneRow = iter.next
TID = oneRow
.get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
val (offset, blockletId, pageId) = if (carbonTable.isHivePartitionTable) {
(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.OFFSET.getTupleIdIndex),
CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.BLOCKLET_ID.getTupleIdIndex),
Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.PAGE_ID.getTupleIdIndex)))
} else if (TID.contains("#/") && load.getPath != null) {
// this is in case of the external segment, where the tuple id has external path with#
(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.EXTERNAL_OFFSET),
CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.EXTERNAL_BLOCKLET_ID),
Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.EXTERNAL_PAGE_ID)))
} else {
(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET),
CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID),
Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.PAGE_ID)))
}
val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
// stop delete operation
if(!IsValidOffset) {
executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING
executorErrors.errorMsg = "Multiple input rows matched for same row."
throw new MultipleMatchingException("Multiple input rows matched for same row.")
}
countOfRows = countOfRows + 1
}
val blockPath =
if (StringUtils.isNotEmpty(load.getPath)) {
load.getPath
} else {
CarbonUpdateUtil.getTableBlockPath(TID,
tablePath,
isStandardTable,
carbonTable.isHivePartitionTable)
}
// get the compressor name
var columnCompressor: String = carbonTable.getTableInfo
.getFactTable
.getTableProperties
.get(CarbonCommonConstants.COMPRESSOR)
if (null == columnCompressor) {
columnCompressor = CompressorFactory.getInstance.getCompressor.getName
}
var blockNameFromTupleID =
if (TID.contains("#/") && load.getPath != null) {
CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.EXTERNAL_BLOCK_ID)
} else {
CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.BLOCK_ID)
}
blockNameFromTupleID = blockNameFromTupleID.replace(CarbonCommonConstants.UNDERSCORE,
CarbonTablePath.BATCH_PREFIX)
val completeBlockName = if (carbonTable.isHivePartitionTable) {
CarbonTablePath
.addDataPartPrefix(
blockNameFromTupleID + CarbonCommonConstants.POINT + columnCompressor +
CarbonCommonConstants.FACT_FILE_EXT)
} else {
CarbonTablePath
.addDataPartPrefix(
blockNameFromTupleID + CarbonCommonConstants.POINT + columnCompressor +
CarbonCommonConstants.FACT_FILE_EXT)
}
val deleteDeltaPath = CarbonUpdateUtil
.getDeleteDeltaFilePath(blockPath, blockName, timestamp)
val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeltaPath)
segmentUpdateDetails.setBlockName(blockName)
segmentUpdateDetails.setActualBlockName(completeBlockName)
segmentUpdateDetails.setSegmentName(load.getLoadName)
segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)
val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock
val totalDeletedRows: Long = alreadyDeletedRows + countOfRows
segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString)
if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) {
segmentUpdateDetails.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
}
else {
// write the delta file
carbonDeleteWriter.write(deleteDeltaBlockDetails)
}
deleteStatus = SegmentStatus.SUCCESS
} catch {
case e : MultipleMatchingException =>
LOGGER.error(e.getMessage)
// don't throw exception here.
case e: Exception =>
val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
LOGGER.error(errorMsg + e.getMessage)
throw e
}
var finished = false
override def hasNext: Boolean = {
if (!finished) {
finished = true
finished
}
else {
!finished
}
}
override def next(): (SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long)) = {
finished = true
result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors, countOfRows.toLong))
}
}
resultIter
}
(res, blockMappingVO)
}