in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala [130:311]
override def internalCompute(theSplit: Partition,
context: TaskContext): Iterator[((K, V), String)] = {
val queryStartTime = System.currentTimeMillis()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter: Iterator[((K, V), String)] = new Iterator[((K, V), String)] {
val carbonSparkPartition: CarbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
val carbonLoadModelCopy: CarbonLoadModel = SecondaryIndexUtil
.getCarbonLoadModel(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
carbonLoadModel.getLoadMetadataDetails,
carbonLoadModel.getFactTimeStamp,
carbonLoadModel.getColumnCompressor)
val indexTable = carbonLoadModelCopy.getCarbonDataLoadSchema.getCarbonTable
carbonLoadModelCopy.setTablePath(indexTable.getTablePath)
carbonLoadModelCopy.setTaskNo(String.valueOf(theSplit.index))
var mergeStatus = false
var mergeNumber = ""
var exec: CarbonCompactionExecutor = _
var processor: AbstractResultProcessor = _
var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = _
var segmentId: String = _
try {
// sorting the table block info List.
val splitList = carbonSparkPartition.split.value.getAllSplits
val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
segmentId = tableBlockInfoList.get(0).getSegmentId
Collections.sort(tableBlockInfoList)
// max cardinality will be calculated from first block of segment
var dataFileFooter: DataFileFooter = null
try {
// As the tableBlockInfoList is sorted take the ColCardinality from the last
// Block of the sorted list as it will have the last updated cardinality.
// Blocks are sorted by order of the update using TableBlockInfo.compare method so
// the last block after the sort will be the latest one.
dataFileFooter = CarbonUtil
.readMetadataFile(tableBlockInfoList.get(tableBlockInfoList.size() - 1))
} catch {
case e: IOException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
// target load name will be same as source load name in case of update data compaction
carbonMergerMapping.mergedLoadName = tableBlockInfoList.get(0).getSegmentId
carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
.toList
mergeNumber = tableBlockInfoList.get(0).getSegment.toString
carbonLoadModelCopy.setSegmentId(mergeNumber)
if (indexTable.isHivePartitionTable) {
carbonLoadModelCopy.setTaskNo(
CarbonScalaUtil.generateUniqueNumber(
theSplit.index,
mergeNumber.replace(".", ""), 0L))
}
CommonUtil.setTempStoreLocation(theSplit.index,
carbonLoadModelCopy,
isCompactionFlow = true,
isAltPartitionFlow = false)
// get destination segment properties as sent from driver which is of last segment.
val segmentProperties = new SegmentProperties(
carbonMergerMapping.maxSegmentColumnSchemaList.asJava)
val segmentMapping: java.util.Map[String, TaskBlockInfo] =
CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
val dataFileMetadataSegMapping: java.util.Map[String, util.List[DataFileFooter]] =
CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList,
indexTable.getSortScope != SortScopeOptions.SortScope.NO_SORT)
carbonLoadModelCopy.setTablePath(indexTablePath)
// check for restructured block
// TODO: only in case of add and drop this variable should be true
val restructuredBlockExists: Boolean = CarbonCompactionUtil
.checkIfAnyRestructuredBlockExists(segmentMapping,
dataFileMetadataSegMapping,
indexTable.getTableLastUpdatedTime)
LOGGER.info(s"Restructured block exists: $restructuredBlockExists")
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
indexTable, dataFileMetadataSegMapping, restructuredBlockExists,
new SparkDataTypeConverterImpl)
// add task completion listener to clean up the resources
CarbonToSparkAdapter.addTaskCompletionListener(close())
try {
// fire a query and get the results.
rawResultIteratorMap = exec.processTableBlocks(FileFactory.getConfiguration, null)
} catch {
case e: Throwable =>
LOGGER.error(e)
if (null != e.getMessage) {
CarbonException.analysisException(
s"Exception occurred in query execution :: ${ e.getMessage }")
} else {
CarbonException.analysisException(
"Exception occurred in query execution.Please check logs.")
}
}
val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
indexTable, carbonLoadModelCopy.getTaskNo, mergeNumber, true, false)
if (indexTable.getSortScope == SortScopeOptions.SortScope.NO_SORT ||
rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX).size() == 0) {
LOGGER.info("RowResultMergerProcessor flow is selected")
processor = new RowResultMergerProcessor(
databaseName,
indexTableName,
segmentProperties,
tempStoreLoc,
carbonLoadModelCopy,
carbonMergerMapping.compactionType,
null)
} else {
LOGGER.info("CompactionResultSortProcessor flow is selected")
processor = new CompactionResultSortProcessor(
carbonLoadModelCopy,
indexTable,
segmentProperties,
carbonMergerMapping.compactionType,
indexTableName,
null)
}
mergeStatus = processor.execute(
rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX),
rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX))
mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
} catch {
case e: Exception =>
LOGGER.error("Compaction Failed ", e)
throw e
}
private def close(): Unit = {
deleteLocalDataFolders()
// close all the query executor service and clean up memory acquired during query processing
if (null != exec) {
LOGGER.info("Cleaning up query resources acquired during compaction")
exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), queryStartTime)
exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), queryStartTime)
}
// clean up the resources for processor
if (null != processor) {
LOGGER.info("Closing compaction processor instance to clean up loading resources")
processor.close()
}
}
private def deleteLocalDataFolders(): Unit = {
try {
LOGGER.info("Deleting local folder store location")
val isCompactionFlow = true
TableProcessingOperations
.deleteLocalDataLoadFolderLocation(carbonLoadModelCopy, isCompactionFlow, false)
} catch {
case e: Exception =>
LOGGER.error(e)
}
}
var finished = false
override def hasNext: Boolean = {
!finished
}
override def next(): ((K, V), String) = {
finished = true
(result.getKey(mergeResult, mergeStatus), segmentId)
}
}
iter
}