def mergeIndexFiles()

in integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala [70:209]


  def mergeIndexFiles(sparkSession: SparkSession,
      segmentIds: Seq[String],
      segmentFileNameToSegmentIdMap: java.util.Map[String, String],
      tablePath: String,
      carbonTable: CarbonTable,
      mergeIndexProperty: Boolean,
      partitionInfo: java.util.List[String] = new java.util.ArrayList[String](),
      tempFolderPath: String = null,
      readFileFooterFromCarbonDataFile: Boolean = false,
      currPartitionSpec: Option[String] = None
  ): Long = {
    var mergeIndexSize = 0L
    if (mergeIndexProperty) {
      new CarbonMergeFilesRDD(
        sparkSession,
        carbonTable,
        segmentIds,
        segmentFileNameToSegmentIdMap,
        carbonTable.isHivePartitionTable,
        readFileFooterFromCarbonDataFile,
        partitionInfo,
        tempFolderPath,
        currPartitionSpec).collect()
    } else {
      try {
        if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) {
          val mergeFilesRDD = new CarbonMergeFilesRDD(
            sparkSession,
            carbonTable,
            segmentIds,
            segmentFileNameToSegmentIdMap,
            carbonTable.isHivePartitionTable,
            readFileFooterFromCarbonDataFile,
            partitionInfo,
            tempFolderPath,
            currPartitionSpec
          )
          if (carbonTable.isHivePartitionTable &&
              null != partitionInfo && !partitionInfo.isEmpty &&
              !StringUtils.isEmpty(tempFolderPath)) {
            // Async, distribute.
            val rows = mergeFilesRDD.collect()
            mergeIndexSize = rows.map(r => java.lang.Long.parseLong(r._1)).sum
            val segmentFiles = rows.map(_._2)
            if (segmentFiles.length > 0) {
              val finalSegmentFile = if (segmentFiles.length == 1) {
                segmentFiles(0)
              } else {
                val temp = segmentFiles(0)
                (1 until segmentFiles.length).foreach { index =>
                  temp.merge(segmentFiles(index))
                }
                temp
              }

              val segmentFilesLocation =
                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
              val locationFile = FileFactory.getCarbonFile(segmentFilesLocation)
              if (!locationFile.exists()) {
                locationFile.mkdirs()
              }
              val segmentFilePath =
                CarbonTablePath
                  .getSegmentFilePath(carbonTable.getTablePath,
                    tempFolderPath.replace(".tmp", CarbonTablePath.SEGMENT_EXT))
              SegmentFileStore.writeSegmentFile(finalSegmentFile, segmentFilePath)
            }
          } else if (carbonTable.isHivePartitionTable && segmentIds.size > 1) {
            // Async, distribute.
            mergeFilesRDD.collect()
          } else {
            // Sync
            mergeFilesRDD.internalGetPartitions.foreach(
              partition => mergeFilesRDD.internalCompute(partition, null)
            )
          }
        }
      } catch {
        case ex: Exception =>
          val message = "Merge Index files request is failed " +
                        s"for table ${ carbonTable.getTableUniqueName }. " + ex.getMessage
          LOGGER.error(message)
          throw new RuntimeException(message, ex)
      }
    }
    if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
      // remove all tmp folder of index files
      val startDelete = System.currentTimeMillis()
      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
      val executorService: ExecutorService = Executors.newFixedThreadPool(numThreads)
      try {
        val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
        partitionInfo
          .asScala
          .map { partitionPath =>
            executorService.submit(new Runnable {
              override def run(): Unit = {
                ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
                FileFactory.deleteAllCarbonFilesOfDir(
                  FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
              }
            })
          }
          .map(_.get())
      } finally {
        if (executorService != null && !executorService.isShutdown) {
          executorService.shutdownNow()
        }
      }
      LOGGER.info("Time taken to remove partition files for all partitions: " +
                  (System.currentTimeMillis() - startDelete))
    } else if (carbonTable.isHivePartitionTable) {
      segmentIds.foreach(segmentId => {
        val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) +
                               CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" +
                               segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp"
        // Generate new timestamp for segment file writing instead of overwriting existing one.
        val uuid = String.valueOf(System.currentTimeMillis)
        val newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
        // Merge all partition files into a single file.
        val segmentFile = SegmentFileStore
          .mergeSegmentFiles(readPath,
            newSegmentFileName,
            CarbonTablePath.getSegmentFilesLocation(tablePath))
        if (segmentFile != null) {
          val sfs = new SegmentFileStore(tablePath, newSegmentFileName +
            CarbonTablePath.SEGMENT_EXT)
          // when compact segment_index, update table status with new segment file name
          val status = SegmentFileStore.updateTableStatusFile(carbonTable, segmentId,
            newSegmentFileName + CarbonTablePath.SEGMENT_EXT,
            carbonTable.getCarbonTableIdentifier.getTableId, sfs, carbonTable.getTableStatusVersion)
          if (!status) {
            throw new IOException("Table status update with mergeIndex file has failed")
          }
        }
      })
    }
    mergeIndexSize
  }