private def triggerCompaction()

in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala [159:324]


  private def triggerCompaction(compactionCallableModel: CompactionCallableModel,
    segmentIdToLoadStartTimeMapping: scala.collection.mutable.Map[String, java.lang.Long])
    (sqlContext: SQLContext): Set[String] = {
    val indexCarbonTable = compactionCallableModel.carbonTable
    val sc = compactionCallableModel.sqlContext
    val carbonLoadModel = compactionCallableModel.carbonLoadModel
    val compactionType = compactionCallableModel.compactionType
    val partitions = compactionCallableModel.currentPartitions
    val tablePath = indexCarbonTable.getTablePath
    val startTime = System.nanoTime()
    var finalMergeStatus = false
    val databaseName: String = indexCarbonTable.getDatabaseName
    val factTableName = indexCarbonTable.getTableName
    val validSegments: util.List[Segment] = CarbonDataMergerUtil
      .getValidSegments(compactionCallableModel.loadsToMerge)
    val mergedLoadName: String = ""
    val carbonMergerMapping = CarbonMergerMapping(
      tablePath,
      indexCarbonTable.getMetadataPath,
      mergedLoadName,
      databaseName,
      factTableName,
      validSegments.asScala.toArray,
      indexCarbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
      compactionType,
      maxSegmentColumnSchemaList = null,
      currentPartitions = partitions)
    carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager.readLoadMetadata(
      indexCarbonTable.getMetadataPath, indexCarbonTable.getTableStatusVersion).toList.asJava)

    val mergedSegments: util.Set[LoadMetadataDetails] = new util.HashSet[LoadMetadataDetails]()
    var rebuiltSegments: Set[String] = Set[String]()
    val segmentIdToLoadStartTimeMap: util.Map[String, String] = new util.HashMap()
    try {
      var siRebuildRDD: CarbonSIRebuildRDD[String, Boolean] = null
      val mergeStatus = if (SortScope.GLOBAL_SORT == indexCarbonTable.getSortScope &&
      !indexCarbonTable.getSortColumns.isEmpty) {
        mergeSISegmentDataFiles(sc.sparkSession, carbonLoadModel, carbonMergerMapping)
      } else {
        siRebuildRDD = new CarbonSIRebuildRDD(sc.sparkSession,
          new MergeResultImpl(),
          carbonLoadModel,
          carbonMergerMapping)
        siRebuildRDD.collect
      }
      if (null != mergeStatus && mergeStatus.length == 0) {
        finalMergeStatus = true
      } else {
        finalMergeStatus = mergeStatus.forall(_._1._2)
        rebuiltSegments = mergeStatus.map(_._2).toSet
        compactionCallableModel.loadsToMerge.asScala.foreach(metadataDetails => {
          if (rebuiltSegments.contains(metadataDetails.getLoadName)) {
            mergedSegments.add(metadataDetails)
            segmentIdToLoadStartTimeMap
              .put(metadataDetails.getLoadName, String.valueOf(metadataDetails.getLoadStartTime))
          }
        })
      }
      if (finalMergeStatus) {
        if (null != mergeStatus && mergeStatus.length != 0) {
          // Segment file is not yet written during SI load, in such case we can delete old index
          // files immediately. If segment file is already present and SI refresh is triggered, then
          // do not delete immediately to avoid failures during parallel queries.
          val validSegmentsToUse = validSegments.asScala
            .filter(segment => {
              val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(tablePath) +
                                    CarbonCommonConstants.FILE_SEPARATOR +
                                    segment.getSegmentFileName
              mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo) &&
              !FileFactory.isFileExist(segmentFilePath)
            })
          deleteOldIndexOrMergeIndexFiles(
            carbonLoadModel.getFactTimeStamp,
            validSegmentsToUse.toList.asJava,
            indexCarbonTable)
          if (SortScope.GLOBAL_SORT == indexCarbonTable.getSortScope &&
            !indexCarbonTable.getSortColumns.isEmpty) {
            deleteOldCarbonDataFiles(carbonLoadModel.getFactTimeStamp,
              validSegmentsToUse.toList.asJava,
              indexCarbonTable)
          } else {
            siRebuildRDD.partitions.foreach { partition =>
              val carbonSparkPartition = partition.asInstanceOf[CarbonSparkPartition]
              deleteOldCarbonDataFiles(carbonSparkPartition, validSegmentsToUse.toList)
            }
          }
          val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, java.lang.Long] =
            scala.collection.mutable.Map()
          // merge index files and write segment file for merged segments
          mergedSegments.asScala.map { seg =>
            val segmentPath = CarbonTablePath.getSegmentPath(tablePath, seg.getLoadName)
            try {
              new CarbonIndexFileMergeWriter(indexCarbonTable)
                .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath,
                  seg.getLoadName, carbonLoadModel.getFactTimeStamp.toString,
                  true)
            } catch {
              case e: IOException =>
                val message =
                  s"Failed to merge index files in path: $segmentPath. ${ e.getMessage() } "
                LOGGER.error(message)
                throw new RuntimeException(message, e)
            }
            val file = SegmentFileStore.writeSegmentFile(
              indexCarbonTable,
              seg.getLoadName,
              carbonLoadModel.getFactTimeStamp.toString,
              null,
              null)
            segmentToLoadStartTimeMap.put(seg.getLoadName,
              carbonLoadModel.getFactTimeStamp)
            // clear the indexSchema cache for the merged segments, as the index files and
            // data files are rewritten after compaction
            if (CarbonProperties.getInstance()
              .isDistributedPruningEnabled(indexCarbonTable.getDatabaseName,
                indexCarbonTable.getTableName)) {
              try {
                IndexServer.getClient
                  .invalidateSegmentCache(indexCarbonTable,
                    rebuiltSegments.toArray,
                    SparkSQLUtil.getTaskGroupId(sc.sparkSession))
              } catch {
                case _: Exception =>
              }
            }
            val segment = new Segment(seg.getLoadName, file)
            segment
          }
          // Here can exclude updating table status for compaction type.
          // For compaction call, we can directly update table status with Compacted/Success
          // with logic present in CarbonTableCompactor.
          if (compactionType == null) {
            FileInternalUtil.updateTableStatus(
              rebuiltSegments.toList,
              carbonLoadModel,
              indexCarbonTable.getTableName,
              SegmentStatus.SUCCESS,
              segmentToLoadStartTimeMap,
              new java.util.HashMap[String, String],
              indexCarbonTable,
              sc.sparkSession,
              carbonLoadModel.getFactTimeStamp,
              rebuiltSegments)
            IndexStoreManager.getInstance
              .clearInvalidSegments(indexCarbonTable, rebuiltSegments.toList.asJava)
          }
        }

        val endTime = System.nanoTime()
        LOGGER.info(s"Time taken to merge is(in nano) ${endTime - startTime}")
        LOGGER.info(s"Merge data files request completed for table " +
                    s"${indexCarbonTable.getDatabaseName}.${indexCarbonTable.getTableName}")
        rebuiltSegments
      } else {
        LOGGER.error(s"Merge data files request failed for table " +
                     s"${indexCarbonTable.getDatabaseName}.${indexCarbonTable.getTableName}")
        throw new Exception("Merge data files Failure in Merger Rdd.")
      }
    } catch {
      case e: Exception =>
        LOGGER.error(s"Merge data files request failed for table " +
                     s"${indexCarbonTable.getDatabaseName}.${indexCarbonTable.getTableName}")
        throw new Exception("Merge data files Failure in Merger Rdd.", e)
    }
  }