override def onEvent()

in integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala [44:202]


  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    event match {
      case preStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
        LOGGER.info("Load post status event-listener called for merge index")
        val loadModel = preStatusUpdateEvent.getCarbonLoadModel
        val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
        val compactedSegments = loadModel.getMergedSegmentIds
        val sparkSession = SparkSession.getActiveSession.get
        var partitionInfo: util.List[String] = new util.ArrayList[String]()
        val partitionPath = operationContext.getProperty("partitionPath")
        if (partitionPath != null) {
          partitionInfo = ObjectSerializationUtil
            .convertStringToObject(partitionPath.asInstanceOf[String])
            .asInstanceOf[util.List[String]]
        }
        val tempPath = operationContext.getProperty("tempPath")
        val loadMetaDetails = loadModel.getCurrentLoadMetadataDetail
        if (loadMetaDetails != null && !loadMetaDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
          if (null != compactedSegments && !compactedSegments.isEmpty) {
            MergeIndexUtil.mergeIndexFilesForCompactedSegments(sparkSession,
              carbonTable,
              compactedSegments)
          } else {
            val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
              String]()

            segmentFileNameMap
              .put(loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp))
            val startTime = System.currentTimeMillis()
            val currPartitionSpec = operationContext.getProperty("carbon.currentpartition")
            val currPartitionSpecOption: Option[String] = if (currPartitionSpec == null) {
              None
            } else {
              Option(currPartitionSpec.asInstanceOf[String])
            }
            val indexSize = CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
              Seq(loadModel.getSegmentId),
              segmentFileNameMap,
              carbonTable.getTablePath,
              carbonTable, false, partitionInfo,
              if (tempPath == null) {
                null
              } else {
                tempPath.toString
              },
              currPartitionSpec = currPartitionSpecOption
            )
            val metrics = new DataLoadMetrics
            metrics.setMergeIndexSize(indexSize)
            loadModel.setMetrics(metrics)
            LOGGER.info("Total time taken for merge index " +
                        (System.currentTimeMillis() - startTime))
            // clear Block index Cache
            MergeIndexUtil.clearBlockIndexCache(carbonTable, Seq(loadModel.getSegmentId))
          }
        }
      case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
        val carbonMainTable = alterTableMergeIndexEvent.carbonTable
        val sparkSession = alterTableMergeIndexEvent.sparkSession
        LOGGER.info(s"Merge Index request received for table " +
                    s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
        val lock = CarbonLockFactory.getCarbonLockObj(
          carbonMainTable.getAbsoluteTableIdentifier,
          LockUsage.COMPACTION_LOCK)

        try {
          if (lock.lockWithRetries()) {
            LOGGER.info("Acquired the compaction lock for table" +
                        s" ${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName}")
            val loadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(carbonMainTable
              .getMetadataPath, carbonMainTable.getTableStatusVersion)
            val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
              String]()
            var streamingSegment: Set[String] = Set[String]()
            loadFolderDetailsArray.foreach(loadMetadataDetails => {
              if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
                streamingSegment += loadMetadataDetails.getLoadName
              }
              segmentFileNameMap
                .put(loadMetadataDetails.getLoadName,
                  String.valueOf(loadMetadataDetails.getLoadStartTime))
            })
            val validSegments =
              CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
            var segmentsToMerge =
              if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
                val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
                validSegments.foreach { segment =>
                  // do not add ROW_V1 format
                  if (!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
                    validSegmentIds += segment.getSegmentNo
                  }
                }
                validSegmentIds
              } else {
                alterTableMergeIndexEvent.alterTableModel
                  .customSegmentIds
                  .get
                  .filterNot(streamingSegment.contains(_))
              }
            validSegments.filter(x => segmentsToMerge.contains(x.getSegmentNo)).foreach { segment =>
              val segmentFile = segment.getSegmentFileName
              val sfs = new SegmentFileStore(carbonMainTable.getTablePath, segmentFile)
              if (sfs.getSegmentFile != null) {
                val indexFiles = sfs.getIndexCarbonFiles
                val segmentPath = CarbonTablePath
                  .getSegmentPath(carbonMainTable.getTablePath, segment.getSegmentNo)
                if (indexFiles.size() == 0) {
                  LOGGER.warn(s"No index files present in path: $segmentPath to merge")
                  // call merge if segments have index files
                  segmentsToMerge = segmentsToMerge.toStream
                    .filterNot(s => s.equals(segment.getSegmentNo)).toList
                }
              }
            }
            // in case of merge index file creation using Alter DDL command
            // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy
            // store (store <= 1.1 version) and create merge Index file as per new store so that
            // old store is also upgraded to new store
            val startTime = System.currentTimeMillis()
            val partitionInfo: util.List[String] = operationContext
              .getProperty("partitionPath")
              .asInstanceOf[util.List[String]]
            val currPartitionSpec = operationContext.getProperty("carbon.currentpartition")
            val currPartitionSpecOption: Option[String] = if (currPartitionSpec == null) {
              None
            } else {
              Option(currPartitionSpec.asInstanceOf[String])
            }
            CarbonMergeFilesRDD.mergeIndexFiles(
              sparkSession = sparkSession,
              segmentIds = segmentsToMerge,
              segmentFileNameToSegmentIdMap = segmentFileNameMap,
              tablePath = carbonMainTable.getTablePath,
              carbonTable = carbonMainTable,
              mergeIndexProperty = true,
              readFileFooterFromCarbonDataFile = true,
              partitionInfo = partitionInfo,
              currPartitionSpec = currPartitionSpecOption)
            LOGGER.info("Total time taken for merge index "
                        + (System.currentTimeMillis() - startTime) + "ms")
            // clear Block index Cache
            MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge)
            val requestMessage = "Compaction request completed for table " +
              s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
            LOGGER.info(requestMessage)
          } else {
            val lockMessage = "Not able to acquire the compaction lock for table " +
                              s"${ carbonMainTable.getDatabaseName }." +
                              s"${ carbonMainTable.getTableName}"
            LOGGER.error(lockMessage)
            CarbonException.analysisException(
              "Table is already locked for compaction. Please try after some time.")
          }
        } finally {
          lock.unlock()
        }
    }
  }