in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala [117:279]
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val queryStartTime = System.currentTimeMillis()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
val partitionSpec = if (carbonTable.isHivePartitionTable) {
carbonSparkPartition.partitionSpec.get
} else {
null
}
val bucketId: Int = if (bucketInfo != null) {
carbonSparkPartition.idx
} else {
0
}
carbonLoadModel.setBucketId(bucketId);
var mergeStatus = false
var mergeNumber = ""
var exec: CarbonCompactionExecutor = null
var processor: AbstractResultProcessor = null
var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = new util
.HashMap[String, util.List[RawResultIterator]]()
try {
// sorting the table block info List.
val splitList = if (null == rangeColumn || singleRange) {
// In case of non-range column or single value inside the range column we do not use
// the broadcast splits, only for range column we use the broadcast splits(which have
// all the splits)
carbonSparkPartition.split.value.getAllSplits
} else {
broadCastSplits.value.getInputSplit
}
val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
Collections.sort(tableBlockInfoList)
mergeNumber = mergedLoadName.substring(
mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
CarbonCommonConstants.LOAD_FOLDER.length(),
mergedLoadName.length()
)
carbonLoadModel.setSegmentId(mergeNumber)
if(carbonTable.isHivePartitionTable) {
carbonLoadModel.setTaskNo(
CarbonScalaUtil.generateUniqueNumber(
theSplit.index,
mergeNumber.replace(".", ""), 0L))
}
CommonUtil.setTempStoreLocation(theSplit.index, carbonLoadModel, true, false)
// get destination segment properties as sent from driver which is of last segment.
val segmentProperties = new SegmentProperties(
carbonMergerMapping.maxSegmentColumnSchemaList.asJava)
val segmentMapping: java.util.Map[String, TaskBlockInfo] =
CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList,
carbonTable.getSortScope != SortScopeOptions.SortScope.NO_SORT)
carbonLoadModel.setTablePath(tablePath)
// check for restructured block
// TODO: only in case of add and drop this variable should be true
val restructuredBlockExists: Boolean = CarbonCompactionUtil
.checkIfAnyRestructuredBlockExists(segmentMapping,
dataFileMetadataSegMapping,
carbonTable.getTableLastUpdatedTime)
LOGGER.info(s"Restructured block exists: $restructuredBlockExists")
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
carbonTable, dataFileMetadataSegMapping, restructuredBlockExists,
new SparkDataTypeConverterImpl)
// add task completion listener to clean up the resources
context.addTaskCompletionListener {
new CompactionTaskCompletionListener(carbonLoadModel,
exec,
processor,
rawResultIteratorMap,
segmentMetaDataAccumulator,
queryStartTime
)
}
try {
// fire a query and get the results.
var expr: expression.Expression = null
if (null != expressionMapForRangeCol) {
expr = expressionMapForRangeCol
.get(theSplit.asInstanceOf[CarbonSparkPartition].idx)
}
rawResultIteratorMap = exec.processTableBlocks(FileFactory.getConfiguration, expr)
} catch {
case e: Throwable =>
LOGGER.error(e)
if (null != e.getMessage) {
CarbonException.analysisException(
s"Exception occurred in query execution :: ${ e.getMessage }")
} else {
CarbonException.analysisException(
"Exception occurred in query execution.Please check logs.")
}
}
val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false)
checkAndUpdatePartitionLocation(partitionSpec)
if (carbonTable.getSortScope == SortScopeOptions.SortScope.NO_SORT ||
rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX).size() == 0) {
LOGGER.info("RowResultMergerProcessor flow is selected")
processor = new RowResultMergerProcessor(
databaseName,
factTableName,
segmentProperties,
tempStoreLoc,
carbonLoadModel,
carbonMergerMapping.compactionType,
partitionSpec)
} else {
LOGGER.info("CompactionResultSortProcessor flow is selected")
processor = new CompactionResultSortProcessor(
carbonLoadModel,
carbonTable,
segmentProperties,
carbonMergerMapping.compactionType,
factTableName,
partitionSpec)
}
mergeStatus = processor.execute(
rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX),
rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX))
mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
} catch {
case e: Exception =>
LOGGER.error("Compaction Failed ", e)
throw e
}
var finished = false
override def hasNext: Boolean = {
!finished
}
override def next(): (K, V) = {
finished = true
result.getKey(mergeResult, mergeStatus)
}
}
iter
}