override def internalCompute()

in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala [130:311]


  override def internalCompute(theSplit: Partition,
    context: TaskContext): Iterator[((K, V), String)] = {
    val queryStartTime = System.currentTimeMillis()
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    val iter: Iterator[((K, V), String)] = new Iterator[((K, V), String)] {
      val carbonSparkPartition: CarbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
      val carbonLoadModelCopy: CarbonLoadModel = SecondaryIndexUtil
        .getCarbonLoadModel(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
          carbonLoadModel.getLoadMetadataDetails,
          carbonLoadModel.getFactTimeStamp,
          carbonLoadModel.getColumnCompressor)
      val indexTable = carbonLoadModelCopy.getCarbonDataLoadSchema.getCarbonTable
      carbonLoadModelCopy.setTablePath(indexTable.getTablePath)
      carbonLoadModelCopy.setTaskNo(String.valueOf(theSplit.index))

      var mergeStatus = false
      var mergeNumber = ""
      var exec: CarbonCompactionExecutor = _
      var processor: AbstractResultProcessor = _
      var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = _
      var segmentId: String = _
      try {
        // sorting the table block info List.
        val splitList = carbonSparkPartition.split.value.getAllSplits
        val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
        segmentId = tableBlockInfoList.get(0).getSegmentId

        Collections.sort(tableBlockInfoList)

        // max cardinality will be calculated from first block of segment
        var dataFileFooter: DataFileFooter = null
        try {
          // As the tableBlockInfoList is sorted take the ColCardinality from the last
          // Block of the sorted list as it will have the last updated cardinality.
          // Blocks are sorted by order of the update using TableBlockInfo.compare method so
          // the last block after the sort will be the latest one.
          dataFileFooter = CarbonUtil
            .readMetadataFile(tableBlockInfoList.get(tableBlockInfoList.size() - 1))
        } catch {
          case e: IOException =>
            logError("Exception in preparing the data file footer for compaction " + e.getMessage)
            throw e
        }
        // target load name will be same as source load name in case of update data compaction
        carbonMergerMapping.mergedLoadName = tableBlockInfoList.get(0).getSegmentId
        carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
          .toList
        mergeNumber = tableBlockInfoList.get(0).getSegment.toString
        carbonLoadModelCopy.setSegmentId(mergeNumber)

        if (indexTable.isHivePartitionTable) {
          carbonLoadModelCopy.setTaskNo(
            CarbonScalaUtil.generateUniqueNumber(
              theSplit.index,
              mergeNumber.replace(".", ""), 0L))
        }

        CommonUtil.setTempStoreLocation(theSplit.index,
          carbonLoadModelCopy,
          isCompactionFlow = true,
          isAltPartitionFlow = false)

        // get destination segment properties as sent from driver which is of last segment.
        val segmentProperties = new SegmentProperties(
          carbonMergerMapping.maxSegmentColumnSchemaList.asJava)

        val segmentMapping: java.util.Map[String, TaskBlockInfo] =
          CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)

        val dataFileMetadataSegMapping: java.util.Map[String, util.List[DataFileFooter]] =
          CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList,
            indexTable.getSortScope != SortScopeOptions.SortScope.NO_SORT)

        carbonLoadModelCopy.setTablePath(indexTablePath)
        // check for restructured block
        // TODO: only in case of add and drop this variable should be true
        val restructuredBlockExists: Boolean = CarbonCompactionUtil
          .checkIfAnyRestructuredBlockExists(segmentMapping,
            dataFileMetadataSegMapping,
            indexTable.getTableLastUpdatedTime)
        LOGGER.info(s"Restructured block exists: $restructuredBlockExists")
        DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
        exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
          indexTable, dataFileMetadataSegMapping, restructuredBlockExists,
          new SparkDataTypeConverterImpl)

        // add task completion listener to clean up the resources
        CarbonToSparkAdapter.addTaskCompletionListener(close())
        try {
          // fire a query and get the results.
          rawResultIteratorMap = exec.processTableBlocks(FileFactory.getConfiguration, null)
        } catch {
          case e: Throwable =>
            LOGGER.error(e)
            if (null != e.getMessage) {
              CarbonException.analysisException(
                s"Exception occurred in query execution :: ${ e.getMessage }")
            } else {
              CarbonException.analysisException(
                "Exception occurred in query execution.Please check logs.")
            }
        }

        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
          indexTable, carbonLoadModelCopy.getTaskNo, mergeNumber, true, false)

        if (indexTable.getSortScope == SortScopeOptions.SortScope.NO_SORT ||
            rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX).size() == 0) {

          LOGGER.info("RowResultMergerProcessor flow is selected")
          processor = new RowResultMergerProcessor(
            databaseName,
            indexTableName,
            segmentProperties,
            tempStoreLoc,
            carbonLoadModelCopy,
            carbonMergerMapping.compactionType,
            null)

        } else {

          LOGGER.info("CompactionResultSortProcessor flow is selected")
          processor = new CompactionResultSortProcessor(
            carbonLoadModelCopy,
            indexTable,
            segmentProperties,
            carbonMergerMapping.compactionType,
            indexTableName,
            null)

        }

        mergeStatus = processor.execute(
          rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX),
          rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX))
        mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
      } catch {
        case e: Exception =>
          LOGGER.error("Compaction Failed ", e)
          throw e
      }

      private def close(): Unit = {
        deleteLocalDataFolders()
        // close all the query executor service and clean up memory acquired during query processing
        if (null != exec) {
          LOGGER.info("Cleaning up query resources acquired during compaction")
          exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), queryStartTime)
          exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), queryStartTime)
        }
        // clean up the resources for processor
        if (null != processor) {
          LOGGER.info("Closing compaction processor instance to clean up loading resources")
          processor.close()
        }
      }

      private def deleteLocalDataFolders(): Unit = {
        try {
          LOGGER.info("Deleting local folder store location")
          val isCompactionFlow = true
          TableProcessingOperations
            .deleteLocalDataLoadFolderLocation(carbonLoadModelCopy, isCompactionFlow, false)
        } catch {
          case e: Exception =>
            LOGGER.error(e)
        }
      }

      var finished = false

      override def hasNext: Boolean = {
        !finished
      }

      override def next(): ((K, V), String) = {
        finished = true
        (result.getKey(mergeResult, mergeStatus), segmentId)
      }
    }
    iter
  }