in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala [159:324]
private def triggerCompaction(compactionCallableModel: CompactionCallableModel,
segmentIdToLoadStartTimeMapping: scala.collection.mutable.Map[String, java.lang.Long])
(sqlContext: SQLContext): Set[String] = {
val indexCarbonTable = compactionCallableModel.carbonTable
val sc = compactionCallableModel.sqlContext
val carbonLoadModel = compactionCallableModel.carbonLoadModel
val compactionType = compactionCallableModel.compactionType
val partitions = compactionCallableModel.currentPartitions
val tablePath = indexCarbonTable.getTablePath
val startTime = System.nanoTime()
var finalMergeStatus = false
val databaseName: String = indexCarbonTable.getDatabaseName
val factTableName = indexCarbonTable.getTableName
val validSegments: util.List[Segment] = CarbonDataMergerUtil
.getValidSegments(compactionCallableModel.loadsToMerge)
val mergedLoadName: String = ""
val carbonMergerMapping = CarbonMergerMapping(
tablePath,
indexCarbonTable.getMetadataPath,
mergedLoadName,
databaseName,
factTableName,
validSegments.asScala.toArray,
indexCarbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
compactionType,
maxSegmentColumnSchemaList = null,
currentPartitions = partitions)
carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager.readLoadMetadata(
indexCarbonTable.getMetadataPath, indexCarbonTable.getTableStatusVersion).toList.asJava)
val mergedSegments: util.Set[LoadMetadataDetails] = new util.HashSet[LoadMetadataDetails]()
var rebuiltSegments: Set[String] = Set[String]()
val segmentIdToLoadStartTimeMap: util.Map[String, String] = new util.HashMap()
try {
var siRebuildRDD: CarbonSIRebuildRDD[String, Boolean] = null
val mergeStatus = if (SortScope.GLOBAL_SORT == indexCarbonTable.getSortScope &&
!indexCarbonTable.getSortColumns.isEmpty) {
mergeSISegmentDataFiles(sc.sparkSession, carbonLoadModel, carbonMergerMapping)
} else {
siRebuildRDD = new CarbonSIRebuildRDD(sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
carbonMergerMapping)
siRebuildRDD.collect
}
if (null != mergeStatus && mergeStatus.length == 0) {
finalMergeStatus = true
} else {
finalMergeStatus = mergeStatus.forall(_._1._2)
rebuiltSegments = mergeStatus.map(_._2).toSet
compactionCallableModel.loadsToMerge.asScala.foreach(metadataDetails => {
if (rebuiltSegments.contains(metadataDetails.getLoadName)) {
mergedSegments.add(metadataDetails)
segmentIdToLoadStartTimeMap
.put(metadataDetails.getLoadName, String.valueOf(metadataDetails.getLoadStartTime))
}
})
}
if (finalMergeStatus) {
if (null != mergeStatus && mergeStatus.length != 0) {
// Segment file is not yet written during SI load, in such case we can delete old index
// files immediately. If segment file is already present and SI refresh is triggered, then
// do not delete immediately to avoid failures during parallel queries.
val validSegmentsToUse = validSegments.asScala
.filter(segment => {
val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(tablePath) +
CarbonCommonConstants.FILE_SEPARATOR +
segment.getSegmentFileName
mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo) &&
!FileFactory.isFileExist(segmentFilePath)
})
deleteOldIndexOrMergeIndexFiles(
carbonLoadModel.getFactTimeStamp,
validSegmentsToUse.toList.asJava,
indexCarbonTable)
if (SortScope.GLOBAL_SORT == indexCarbonTable.getSortScope &&
!indexCarbonTable.getSortColumns.isEmpty) {
deleteOldCarbonDataFiles(carbonLoadModel.getFactTimeStamp,
validSegmentsToUse.toList.asJava,
indexCarbonTable)
} else {
siRebuildRDD.partitions.foreach { partition =>
val carbonSparkPartition = partition.asInstanceOf[CarbonSparkPartition]
deleteOldCarbonDataFiles(carbonSparkPartition, validSegmentsToUse.toList)
}
}
val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, java.lang.Long] =
scala.collection.mutable.Map()
// merge index files and write segment file for merged segments
mergedSegments.asScala.map { seg =>
val segmentPath = CarbonTablePath.getSegmentPath(tablePath, seg.getLoadName)
try {
new CarbonIndexFileMergeWriter(indexCarbonTable)
.writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath,
seg.getLoadName, carbonLoadModel.getFactTimeStamp.toString,
true)
} catch {
case e: IOException =>
val message =
s"Failed to merge index files in path: $segmentPath. ${ e.getMessage() } "
LOGGER.error(message)
throw new RuntimeException(message, e)
}
val file = SegmentFileStore.writeSegmentFile(
indexCarbonTable,
seg.getLoadName,
carbonLoadModel.getFactTimeStamp.toString,
null,
null)
segmentToLoadStartTimeMap.put(seg.getLoadName,
carbonLoadModel.getFactTimeStamp)
// clear the indexSchema cache for the merged segments, as the index files and
// data files are rewritten after compaction
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(indexCarbonTable.getDatabaseName,
indexCarbonTable.getTableName)) {
try {
IndexServer.getClient
.invalidateSegmentCache(indexCarbonTable,
rebuiltSegments.toArray,
SparkSQLUtil.getTaskGroupId(sc.sparkSession))
} catch {
case _: Exception =>
}
}
val segment = new Segment(seg.getLoadName, file)
segment
}
// Here can exclude updating table status for compaction type.
// For compaction call, we can directly update table status with Compacted/Success
// with logic present in CarbonTableCompactor.
if (compactionType == null) {
FileInternalUtil.updateTableStatus(
rebuiltSegments.toList,
carbonLoadModel,
indexCarbonTable.getTableName,
SegmentStatus.SUCCESS,
segmentToLoadStartTimeMap,
new java.util.HashMap[String, String],
indexCarbonTable,
sc.sparkSession,
carbonLoadModel.getFactTimeStamp,
rebuiltSegments)
IndexStoreManager.getInstance
.clearInvalidSegments(indexCarbonTable, rebuiltSegments.toList.asJava)
}
}
val endTime = System.nanoTime()
LOGGER.info(s"Time taken to merge is(in nano) ${endTime - startTime}")
LOGGER.info(s"Merge data files request completed for table " +
s"${indexCarbonTable.getDatabaseName}.${indexCarbonTable.getTableName}")
rebuiltSegments
} else {
LOGGER.error(s"Merge data files request failed for table " +
s"${indexCarbonTable.getDatabaseName}.${indexCarbonTable.getTableName}")
throw new Exception("Merge data files Failure in Merger Rdd.")
}
} catch {
case e: Exception =>
LOGGER.error(s"Merge data files request failed for table " +
s"${indexCarbonTable.getDatabaseName}.${indexCarbonTable.getTableName}")
throw new Exception("Merge data files Failure in Merger Rdd.", e)
}
}