override def internalCompute()

in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala [117:279]


  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
    val queryStartTime = System.currentTimeMillis()
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    val iter = new Iterator[(K, V)] {
      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
      val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
      carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
      val partitionSpec = if (carbonTable.isHivePartitionTable) {
        carbonSparkPartition.partitionSpec.get
      } else {
        null
      }
      val bucketId: Int = if (bucketInfo != null) {
        carbonSparkPartition.idx
      } else {
        0
      }
      carbonLoadModel.setBucketId(bucketId);

      var mergeStatus = false
      var mergeNumber = ""
      var exec: CarbonCompactionExecutor = null
      var processor: AbstractResultProcessor = null
      var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = new util
      .HashMap[String, util.List[RawResultIterator]]()
      try {
        // sorting the table block info List.
        val splitList = if (null == rangeColumn || singleRange) {
          // In case of non-range column or single value inside the range column we do not use
          // the broadcast splits, only for range column we use the broadcast splits(which have
          // all the splits)
          carbonSparkPartition.split.value.getAllSplits
        } else {
          broadCastSplits.value.getInputSplit
        }
        val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)

        Collections.sort(tableBlockInfoList)

        mergeNumber = mergedLoadName.substring(
          mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
            CarbonCommonConstants.LOAD_FOLDER.length(),
          mergedLoadName.length()
        )
        carbonLoadModel.setSegmentId(mergeNumber)

        if(carbonTable.isHivePartitionTable) {
          carbonLoadModel.setTaskNo(
              CarbonScalaUtil.generateUniqueNumber(
                  theSplit.index,
                  mergeNumber.replace(".", ""), 0L))
        }

        CommonUtil.setTempStoreLocation(theSplit.index, carbonLoadModel, true, false)

        // get destination segment properties as sent from driver which is of last segment.
        val segmentProperties = new SegmentProperties(
          carbonMergerMapping.maxSegmentColumnSchemaList.asJava)

        val segmentMapping: java.util.Map[String, TaskBlockInfo] =
          CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)

        val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
          CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList,
            carbonTable.getSortScope != SortScopeOptions.SortScope.NO_SORT)

        carbonLoadModel.setTablePath(tablePath)
        // check for restructured block
        // TODO: only in case of add and drop this variable should be true
        val restructuredBlockExists: Boolean = CarbonCompactionUtil
          .checkIfAnyRestructuredBlockExists(segmentMapping,
            dataFileMetadataSegMapping,
            carbonTable.getTableLastUpdatedTime)
        LOGGER.info(s"Restructured block exists: $restructuredBlockExists")
        DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
        exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
          carbonTable, dataFileMetadataSegMapping, restructuredBlockExists,
          new SparkDataTypeConverterImpl)

        // add task completion listener to clean up the resources
        context.addTaskCompletionListener {
          new CompactionTaskCompletionListener(carbonLoadModel,
            exec,
            processor,
            rawResultIteratorMap,
            segmentMetaDataAccumulator,
            queryStartTime
          )
        }
        try {
          // fire a query and get the results.
          var expr: expression.Expression = null
          if (null != expressionMapForRangeCol) {
            expr = expressionMapForRangeCol
              .get(theSplit.asInstanceOf[CarbonSparkPartition].idx)
          }
          rawResultIteratorMap = exec.processTableBlocks(FileFactory.getConfiguration, expr)
        } catch {
          case e: Throwable =>
            LOGGER.error(e)
            if (null != e.getMessage) {
              CarbonException.analysisException(
                s"Exception occurred in query execution :: ${ e.getMessage }")
            } else {
              CarbonException.analysisException(
                "Exception occurred in query execution.Please check logs.")
            }
        }

        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
          carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false)

       checkAndUpdatePartitionLocation(partitionSpec)
        if (carbonTable.getSortScope == SortScopeOptions.SortScope.NO_SORT ||
          rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX).size() == 0) {

          LOGGER.info("RowResultMergerProcessor flow is selected")
          processor = new RowResultMergerProcessor(
            databaseName,
            factTableName,
            segmentProperties,
            tempStoreLoc,
            carbonLoadModel,
            carbonMergerMapping.compactionType,
            partitionSpec)

        } else {

          LOGGER.info("CompactionResultSortProcessor flow is selected")
          processor = new CompactionResultSortProcessor(
            carbonLoadModel,
            carbonTable,
            segmentProperties,
            carbonMergerMapping.compactionType,
            factTableName,
            partitionSpec)

        }

        mergeStatus = processor.execute(
          rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX),
          rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX))
        mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber

      } catch {
        case e: Exception =>
          LOGGER.error("Compaction Failed ", e)
          throw e
      }

      var finished = false

      override def hasNext: Boolean = {
        !finished
      }

      override def next(): (K, V) = {
        finished = true
        result.getKey(mergeResult, mergeStatus)
      }
    }
    iter
  }