in integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala [70:209]
def mergeIndexFiles(sparkSession: SparkSession,
segmentIds: Seq[String],
segmentFileNameToSegmentIdMap: java.util.Map[String, String],
tablePath: String,
carbonTable: CarbonTable,
mergeIndexProperty: Boolean,
partitionInfo: java.util.List[String] = new java.util.ArrayList[String](),
tempFolderPath: String = null,
readFileFooterFromCarbonDataFile: Boolean = false,
currPartitionSpec: Option[String] = None
): Long = {
var mergeIndexSize = 0L
if (mergeIndexProperty) {
new CarbonMergeFilesRDD(
sparkSession,
carbonTable,
segmentIds,
segmentFileNameToSegmentIdMap,
carbonTable.isHivePartitionTable,
readFileFooterFromCarbonDataFile,
partitionInfo,
tempFolderPath,
currPartitionSpec).collect()
} else {
try {
if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) {
val mergeFilesRDD = new CarbonMergeFilesRDD(
sparkSession,
carbonTable,
segmentIds,
segmentFileNameToSegmentIdMap,
carbonTable.isHivePartitionTable,
readFileFooterFromCarbonDataFile,
partitionInfo,
tempFolderPath,
currPartitionSpec
)
if (carbonTable.isHivePartitionTable &&
null != partitionInfo && !partitionInfo.isEmpty &&
!StringUtils.isEmpty(tempFolderPath)) {
// Async, distribute.
val rows = mergeFilesRDD.collect()
mergeIndexSize = rows.map(r => java.lang.Long.parseLong(r._1)).sum
val segmentFiles = rows.map(_._2)
if (segmentFiles.length > 0) {
val finalSegmentFile = if (segmentFiles.length == 1) {
segmentFiles(0)
} else {
val temp = segmentFiles(0)
(1 until segmentFiles.length).foreach { index =>
temp.merge(segmentFiles(index))
}
temp
}
val segmentFilesLocation =
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
val locationFile = FileFactory.getCarbonFile(segmentFilesLocation)
if (!locationFile.exists()) {
locationFile.mkdirs()
}
val segmentFilePath =
CarbonTablePath
.getSegmentFilePath(carbonTable.getTablePath,
tempFolderPath.replace(".tmp", CarbonTablePath.SEGMENT_EXT))
SegmentFileStore.writeSegmentFile(finalSegmentFile, segmentFilePath)
}
} else if (carbonTable.isHivePartitionTable && segmentIds.size > 1) {
// Async, distribute.
mergeFilesRDD.collect()
} else {
// Sync
mergeFilesRDD.internalGetPartitions.foreach(
partition => mergeFilesRDD.internalCompute(partition, null)
)
}
}
} catch {
case ex: Exception =>
val message = "Merge Index files request is failed " +
s"for table ${ carbonTable.getTableUniqueName }. " + ex.getMessage
LOGGER.error(message)
throw new RuntimeException(message, ex)
}
}
if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
// remove all tmp folder of index files
val startDelete = System.currentTimeMillis()
val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
val executorService: ExecutorService = Executors.newFixedThreadPool(numThreads)
try {
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
partitionInfo
.asScala
.map { partitionPath =>
executorService.submit(new Runnable {
override def run(): Unit = {
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
FileFactory.deleteAllCarbonFilesOfDir(
FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
}
})
}
.map(_.get())
} finally {
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
}
}
LOGGER.info("Time taken to remove partition files for all partitions: " +
(System.currentTimeMillis() - startDelete))
} else if (carbonTable.isHivePartitionTable) {
segmentIds.foreach(segmentId => {
val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) +
CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" +
segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp"
// Generate new timestamp for segment file writing instead of overwriting existing one.
val uuid = String.valueOf(System.currentTimeMillis)
val newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
// Merge all partition files into a single file.
val segmentFile = SegmentFileStore
.mergeSegmentFiles(readPath,
newSegmentFileName,
CarbonTablePath.getSegmentFilesLocation(tablePath))
if (segmentFile != null) {
val sfs = new SegmentFileStore(tablePath, newSegmentFileName +
CarbonTablePath.SEGMENT_EXT)
// when compact segment_index, update table status with new segment file name
val status = SegmentFileStore.updateTableStatusFile(carbonTable, segmentId,
newSegmentFileName + CarbonTablePath.SEGMENT_EXT,
carbonTable.getCarbonTableIdentifier.getTableId, sfs, carbonTable.getTableStatusVersion)
if (!status) {
throw new IOException("Table status update with mergeIndex file has failed")
}
}
})
}
mergeIndexSize
}