private def triggerCompaction()

in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala [204:479]


  private def triggerCompaction(compactionCallableModel: CompactionCallableModel,
      mergedLoadName: String): Unit = {
    val carbonTable = compactionCallableModel.carbonTable
    val loadsToMerge = compactionCallableModel.loadsToMerge
    val sc = compactionCallableModel.sqlContext
    val carbonLoadModel = compactionCallableModel.carbonLoadModel
    val compactionType = compactionCallableModel.compactionType
    val partitions = compactionCallableModel.currentPartitions
    val tablePath = carbonLoadModel.getTablePath
    val startTime = System.nanoTime()
    val mergedLoads = compactionCallableModel.compactedSegments
    mergedLoads.add(mergedLoadName)
    var finalMergeStatus = false
    val databaseName: String = carbonLoadModel.getDatabaseName
    val factTableName = carbonLoadModel.getTableName
    val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge)
    val carbonMergerMapping = CarbonMergerMapping(
      tablePath,
      carbonTable.getMetadataPath,
      mergedLoadName,
      databaseName,
      factTableName,
      validSegments.asScala.toArray,
      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
      compactionType,
      maxSegmentColumnSchemaList = null,
      currentPartitions = partitions)
    carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
    val tblStatusVersion = carbonLoadModel.getCarbonDataLoadSchema
      .getCarbonTable.getTableStatusVersion
    carbonLoadModel.setLoadMetadataDetails(
      SegmentStatusManager
        .readLoadMetadata(carbonTable.getMetadataPath, tblStatusVersion).toList.asJava)
    // trigger event for compaction
    val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
      AlterTableCompactionPreEvent(sqlContext.sparkSession,
        carbonTable,
        carbonMergerMapping,
        mergedLoadName)
    OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
    // Add pre event listener for index indexSchema
    val tableIndexes = IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)
    val indexOperationContext = new OperationContext()
    if (null != tableIndexes) {
      val indexNames: mutable.Buffer[String] =
        tableIndexes.asScala.map(index => index.getIndexSchema.getIndexName)
      val indexPreExecutionEvent: BuildIndexPreExecutionEvent =
        new BuildIndexPreExecutionEvent(sqlContext.sparkSession,
        carbonTable.getAbsoluteTableIdentifier, indexNames)
      OperationListenerBus.getInstance().fireEvent(indexPreExecutionEvent,
        indexOperationContext)
    }
    // accumulator to collect segment metadata
    val segmentMetaDataAccumulator = sqlContext
      .sparkContext
      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]

    val updatePartitionSpecs : List[PartitionSpec] = new util.ArrayList[PartitionSpec]
    var mergeRDD: CarbonMergerRDD[String, Boolean] = null
    if (carbonTable.isHivePartitionTable) {
      // collect related partitions
      mergeRDD = new CarbonMergerRDD(
        sc.sparkSession,
        new MergeResultImpl(),
        carbonLoadModel,
        carbonMergerMapping,
        segmentMetaDataAccumulator
      )
      val partitionSpecs = mergeRDD.getPartitions.map { partition =>
        partition.asInstanceOf[CarbonSparkPartition].partitionSpec.get
      }.distinct
      if (partitionSpecs != null && partitionSpecs.nonEmpty) {
        compactionCallableModel.compactedPartitions = Some(partitionSpecs)
      }
      partitionSpecs.foreach(partitionSpec => {
        if (!partitionSpec.getLocation.toString.startsWith(carbonLoadModel.getTablePath)) {
          // if partition spec added is external path,
          // after compaction location path to be updated with table path.
          updatePartitionSpecs.add(partitionSpec)
        }
      })
    }

    val mergeStatus =
      if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
          !carbonTable.getSortColumns.isEmpty &&
          carbonTable.getRangeColumn == null &&
          CarbonUtil.isStandardCarbonTable(carbonTable)) {
        compactSegmentsByGlobalSort(sc.sparkSession,
          carbonLoadModel,
          carbonMergerMapping,
          segmentMetaDataAccumulator)
      } else {
        if (mergeRDD != null) {
          val result = mergeRDD.collect
          if (!updatePartitionSpecs.isEmpty) {
            val tableIdentifier = new TableIdentifier(carbonTable.getTableName,
              Some(carbonTable.getDatabaseName))
            val partitionSpecs = updatePartitionSpecs.asScala.map {
              partitionSpec =>
                // replaces old partitionSpec with updated partitionSpec
                mergeRDD.checkAndUpdatePartitionLocation(partitionSpec)
                PartitioningUtils.parsePathFragment(
                  String.join(CarbonCommonConstants.FILE_SEPARATOR, partitionSpec.getPartitions))
            }
            // To update partitionSpec in hive metastore, drop and add with latest path.
            AlterTableDropPartitionCommand(
              tableIdentifier,
              partitionSpecs,
              true, false, true).run(sqlContext.sparkSession)
            AlterTableAddPartitionCommand(tableIdentifier,
              partitionSpecs.map(p => (p, None)), false).run(sqlContext.sparkSession)
          }
          result
        } else {
          new CarbonMergerRDD(
            sc.sparkSession,
            new MergeResultImpl(),
            carbonLoadModel,
            carbonMergerMapping,
            segmentMetaDataAccumulator
          ).collect
        }
      }

    if (mergeStatus.length == 0) {
      finalMergeStatus = false
    } else {
      finalMergeStatus = mergeStatus.forall(_._2)
    }

    if (finalMergeStatus) {
      val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
      var segmentFileName: String = null

      val isMergeIndex = CarbonProperties.getInstance().getProperty(
        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean

      if (compactionType != CompactionType.IUD_DELETE_DELTA && isMergeIndex) {
        MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
      }

      if (carbonTable.isHivePartitionTable) {
        if (isMergeIndex) {
          val segmentTmpFileName = carbonLoadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
          segmentFileName = mergedLoadNumber + CarbonCommonConstants.UNDERSCORE + segmentTmpFileName
          val segmentTmpFile = FileFactory.getCarbonFile(
            CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath, segmentTmpFileName))
          if (!segmentTmpFile.renameForce(
            CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath, segmentFileName))) {
            throw new Exception(s"Rename segment file from ${segmentTmpFileName} " +
              s"to ${segmentFileName} failed.")
          }
        } else {
          // By default carbon.merge.index.in.segment is true and this code will be used for
          // developer debugging purpose.
          val readPath =
            CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
              CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp + ".tmp"
          // Merge all partition files into a single file.
          segmentFileName =
            mergedLoadNumber + CarbonCommonConstants.UNDERSCORE + carbonLoadModel.getFactTimeStamp
          val mergedSegmetFile = SegmentFileStore
            .mergeSegmentFiles(readPath,
              segmentFileName,
              CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
          if (mergedSegmetFile != null) {
            SegmentFileStore
              .moveFromTempFolder(mergedSegmetFile,
                carbonLoadModel.getFactTimeStamp + ".tmp",
                carbonLoadModel.getTablePath)
          }
          segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
        }
      } else {
        // Get the segment files each updated segment in case of IUD compaction
        val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
          mergedLoadNumber,
          segmentMetaDataAccumulator)
        segmentFileName = SegmentFileStore.writeSegmentFile(
          carbonTable,
          mergedLoadNumber,
          carbonLoadModel.getFactTimeStamp.toString,
          segmentMetaDataInfo)
      }
      // clear segmentMetaDataAccumulator
      segmentMetaDataAccumulator.reset()
      // Used to inform the commit listener that the commit is fired from compaction flow.
      operationContext.setProperty("isCompaction", "true")
      // trigger event for compaction
      val alterTableCompactionPreStatusUpdateEvent =
      AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession,
        carbonTable,
        carbonMergerMapping,
        carbonLoadModel,
        mergedLoadName)
      OperationListenerBus.getInstance
        .fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext)

      val endTime = System.nanoTime()
      LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
      val statusFileUpdate =
        CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
          loadsToMerge,
          carbonTable.getMetadataPath,
          mergedLoadNumber,
          carbonLoadModel,
          compactionType,
          segmentFileName,
          MVManagerInSpark.get(sc.sparkSession))

      if (!statusFileUpdate) {
        LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
                     s"${ carbonLoadModel.getTableName }")
        throw new Exception(s"Compaction failed to update metadata for table" +
                            s" ${ carbonLoadModel.getDatabaseName }." +
                            s"${ carbonLoadModel.getTableName }")
      }
      CarbonHiveIndexMetadataUtil.updateTableStatusVersion(carbonTable,
        sc.sparkSession,
        carbonLoadModel.getLatestTableStatusWriteVersion)

      val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession,
        carbonTable,
        carbonMergerMapping,
        carbonLoadModel,
        mergedLoadName)
      OperationListenerBus.getInstance()
        .fireEvent(compactionLoadStatusPostEvent, operationContext)
      if (null != tableIndexes) {
        val buildIndexPostExecutionEvent = new BuildIndexPostExecutionEvent(
          sqlContext.sparkSession, carbonTable.getAbsoluteTableIdentifier,
          null, Seq(mergedLoadNumber), true)
        OperationListenerBus.getInstance()
          .fireEvent(buildIndexPostExecutionEvent, indexOperationContext)
      }
      val commitDone = operationContext.getProperty("commitComplete")
      val commitComplete = if (null != commitDone) {
        commitDone.toString.toBoolean
      } else {
        true
      }
      if (!commitComplete) {
        LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
                     s"${ carbonLoadModel.getTableName }")
        throw new Exception(s"Compaction failed to update metadata for table" +
                            s" ${ carbonLoadModel.getDatabaseName }." +
                            s"${ carbonLoadModel.getTableName }")
      } else {
        LOGGER.info(s"Compaction request completed for table " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")

        // Pre-priming index for compaction
        val segmentsForPriming = if (compactionType.equals(CompactionType.IUD_DELETE_DELTA)) {
            validSegments.asScala.map(_.getSegmentNo).toList
        } else if (compactionType.equals(CompactionType.MAJOR) ||
                   compactionType.equals(CompactionType.MINOR) ||
                   compactionType.equals(CompactionType.CUSTOM)) {
            scala.List(mergedLoadNumber)
        } else {
          scala.List()
        }
        DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession,
          carbonTable,
          validSegments.asScala.map(_.getSegmentNo).toList,
          operationContext,
          FileFactory.getConfiguration,
          segmentsForPriming)
      }
    } else {
      LOGGER.error(s"Compaction request failed for table " +
                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
      throw new Exception("Compaction Failure in Merger Rdd.")
    }
  }