in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala [204:479]
private def triggerCompaction(compactionCallableModel: CompactionCallableModel,
mergedLoadName: String): Unit = {
val carbonTable = compactionCallableModel.carbonTable
val loadsToMerge = compactionCallableModel.loadsToMerge
val sc = compactionCallableModel.sqlContext
val carbonLoadModel = compactionCallableModel.carbonLoadModel
val compactionType = compactionCallableModel.compactionType
val partitions = compactionCallableModel.currentPartitions
val tablePath = carbonLoadModel.getTablePath
val startTime = System.nanoTime()
val mergedLoads = compactionCallableModel.compactedSegments
mergedLoads.add(mergedLoadName)
var finalMergeStatus = false
val databaseName: String = carbonLoadModel.getDatabaseName
val factTableName = carbonLoadModel.getTableName
val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge)
val carbonMergerMapping = CarbonMergerMapping(
tablePath,
carbonTable.getMetadataPath,
mergedLoadName,
databaseName,
factTableName,
validSegments.asScala.toArray,
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
compactionType,
maxSegmentColumnSchemaList = null,
currentPartitions = partitions)
carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
val tblStatusVersion = carbonLoadModel.getCarbonDataLoadSchema
.getCarbonTable.getTableStatusVersion
carbonLoadModel.setLoadMetadataDetails(
SegmentStatusManager
.readLoadMetadata(carbonTable.getMetadataPath, tblStatusVersion).toList.asJava)
// trigger event for compaction
val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
AlterTableCompactionPreEvent(sqlContext.sparkSession,
carbonTable,
carbonMergerMapping,
mergedLoadName)
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
// Add pre event listener for index indexSchema
val tableIndexes = IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)
val indexOperationContext = new OperationContext()
if (null != tableIndexes) {
val indexNames: mutable.Buffer[String] =
tableIndexes.asScala.map(index => index.getIndexSchema.getIndexName)
val indexPreExecutionEvent: BuildIndexPreExecutionEvent =
new BuildIndexPreExecutionEvent(sqlContext.sparkSession,
carbonTable.getAbsoluteTableIdentifier, indexNames)
OperationListenerBus.getInstance().fireEvent(indexPreExecutionEvent,
indexOperationContext)
}
// accumulator to collect segment metadata
val segmentMetaDataAccumulator = sqlContext
.sparkContext
.collectionAccumulator[Map[String, SegmentMetaDataInfo]]
val updatePartitionSpecs : List[PartitionSpec] = new util.ArrayList[PartitionSpec]
var mergeRDD: CarbonMergerRDD[String, Boolean] = null
if (carbonTable.isHivePartitionTable) {
// collect related partitions
mergeRDD = new CarbonMergerRDD(
sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
carbonMergerMapping,
segmentMetaDataAccumulator
)
val partitionSpecs = mergeRDD.getPartitions.map { partition =>
partition.asInstanceOf[CarbonSparkPartition].partitionSpec.get
}.distinct
if (partitionSpecs != null && partitionSpecs.nonEmpty) {
compactionCallableModel.compactedPartitions = Some(partitionSpecs)
}
partitionSpecs.foreach(partitionSpec => {
if (!partitionSpec.getLocation.toString.startsWith(carbonLoadModel.getTablePath)) {
// if partition spec added is external path,
// after compaction location path to be updated with table path.
updatePartitionSpecs.add(partitionSpec)
}
})
}
val mergeStatus =
if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
!carbonTable.getSortColumns.isEmpty &&
carbonTable.getRangeColumn == null &&
CarbonUtil.isStandardCarbonTable(carbonTable)) {
compactSegmentsByGlobalSort(sc.sparkSession,
carbonLoadModel,
carbonMergerMapping,
segmentMetaDataAccumulator)
} else {
if (mergeRDD != null) {
val result = mergeRDD.collect
if (!updatePartitionSpecs.isEmpty) {
val tableIdentifier = new TableIdentifier(carbonTable.getTableName,
Some(carbonTable.getDatabaseName))
val partitionSpecs = updatePartitionSpecs.asScala.map {
partitionSpec =>
// replaces old partitionSpec with updated partitionSpec
mergeRDD.checkAndUpdatePartitionLocation(partitionSpec)
PartitioningUtils.parsePathFragment(
String.join(CarbonCommonConstants.FILE_SEPARATOR, partitionSpec.getPartitions))
}
// To update partitionSpec in hive metastore, drop and add with latest path.
AlterTableDropPartitionCommand(
tableIdentifier,
partitionSpecs,
true, false, true).run(sqlContext.sparkSession)
AlterTableAddPartitionCommand(tableIdentifier,
partitionSpecs.map(p => (p, None)), false).run(sqlContext.sparkSession)
}
result
} else {
new CarbonMergerRDD(
sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
carbonMergerMapping,
segmentMetaDataAccumulator
).collect
}
}
if (mergeStatus.length == 0) {
finalMergeStatus = false
} else {
finalMergeStatus = mergeStatus.forall(_._2)
}
if (finalMergeStatus) {
val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
var segmentFileName: String = null
val isMergeIndex = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean
if (compactionType != CompactionType.IUD_DELETE_DELTA && isMergeIndex) {
MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
}
if (carbonTable.isHivePartitionTable) {
if (isMergeIndex) {
val segmentTmpFileName = carbonLoadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
segmentFileName = mergedLoadNumber + CarbonCommonConstants.UNDERSCORE + segmentTmpFileName
val segmentTmpFile = FileFactory.getCarbonFile(
CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath, segmentTmpFileName))
if (!segmentTmpFile.renameForce(
CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath, segmentFileName))) {
throw new Exception(s"Rename segment file from ${segmentTmpFileName} " +
s"to ${segmentFileName} failed.")
}
} else {
// By default carbon.merge.index.in.segment is true and this code will be used for
// developer debugging purpose.
val readPath =
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp + ".tmp"
// Merge all partition files into a single file.
segmentFileName =
mergedLoadNumber + CarbonCommonConstants.UNDERSCORE + carbonLoadModel.getFactTimeStamp
val mergedSegmetFile = SegmentFileStore
.mergeSegmentFiles(readPath,
segmentFileName,
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
if (mergedSegmetFile != null) {
SegmentFileStore
.moveFromTempFolder(mergedSegmetFile,
carbonLoadModel.getFactTimeStamp + ".tmp",
carbonLoadModel.getTablePath)
}
segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
}
} else {
// Get the segment files each updated segment in case of IUD compaction
val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
mergedLoadNumber,
segmentMetaDataAccumulator)
segmentFileName = SegmentFileStore.writeSegmentFile(
carbonTable,
mergedLoadNumber,
carbonLoadModel.getFactTimeStamp.toString,
segmentMetaDataInfo)
}
// clear segmentMetaDataAccumulator
segmentMetaDataAccumulator.reset()
// Used to inform the commit listener that the commit is fired from compaction flow.
operationContext.setProperty("isCompaction", "true")
// trigger event for compaction
val alterTableCompactionPreStatusUpdateEvent =
AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession,
carbonTable,
carbonMergerMapping,
carbonLoadModel,
mergedLoadName)
OperationListenerBus.getInstance
.fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext)
val endTime = System.nanoTime()
LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
val statusFileUpdate =
CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
loadsToMerge,
carbonTable.getMetadataPath,
mergedLoadNumber,
carbonLoadModel,
compactionType,
segmentFileName,
MVManagerInSpark.get(sc.sparkSession))
if (!statusFileUpdate) {
LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
throw new Exception(s"Compaction failed to update metadata for table" +
s" ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
}
CarbonHiveIndexMetadataUtil.updateTableStatusVersion(carbonTable,
sc.sparkSession,
carbonLoadModel.getLatestTableStatusWriteVersion)
val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession,
carbonTable,
carbonMergerMapping,
carbonLoadModel,
mergedLoadName)
OperationListenerBus.getInstance()
.fireEvent(compactionLoadStatusPostEvent, operationContext)
if (null != tableIndexes) {
val buildIndexPostExecutionEvent = new BuildIndexPostExecutionEvent(
sqlContext.sparkSession, carbonTable.getAbsoluteTableIdentifier,
null, Seq(mergedLoadNumber), true)
OperationListenerBus.getInstance()
.fireEvent(buildIndexPostExecutionEvent, indexOperationContext)
}
val commitDone = operationContext.getProperty("commitComplete")
val commitComplete = if (null != commitDone) {
commitDone.toString.toBoolean
} else {
true
}
if (!commitComplete) {
LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
throw new Exception(s"Compaction failed to update metadata for table" +
s" ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
} else {
LOGGER.info(s"Compaction request completed for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
// Pre-priming index for compaction
val segmentsForPriming = if (compactionType.equals(CompactionType.IUD_DELETE_DELTA)) {
validSegments.asScala.map(_.getSegmentNo).toList
} else if (compactionType.equals(CompactionType.MAJOR) ||
compactionType.equals(CompactionType.MINOR) ||
compactionType.equals(CompactionType.CUSTOM)) {
scala.List(mergedLoadNumber)
} else {
scala.List()
}
DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession,
carbonTable,
validSegments.asScala.map(_.getSegmentNo).toList,
operationContext,
FileFactory.getConfiguration,
segmentsForPriming)
}
} else {
LOGGER.error(s"Compaction request failed for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
throw new Exception("Compaction Failure in Merger Rdd.")
}
}