in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala [227:406]
private def writeMetaForSegment(
sparkSession: SparkSession,
carbonTable: CarbonTable,
segmentPath: String,
partitionSpecOp: Option[CarbonPartitionSpec] = None,
partitionDataFiles: Seq[FileStatus] = Seq.empty
): Unit = {
val model = new CarbonLoadModel
model.setCarbonTransactionalTable(true)
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
model.setDatabaseName(carbonTable.getDatabaseName)
model.setTableName(carbonTable.getTableName)
val operationContext = new OperationContext
operationContext.setProperty("isLoadOrCompaction", false)
val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(sparkSession,
model,
"",
segmentPath,
options.asJava,
options.asJava,
false,
false,
None,
operationContext)
val newLoadMetaEntry = new LoadMetadataDetails
model.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime)
CarbonLoaderUtil.populateNewLoadMetaEntry(newLoadMetaEntry,
SegmentStatus.INSERT_IN_PROGRESS,
model.getFactTimeStamp,
false)
newLoadMetaEntry.setPath(segmentPath)
val format = options.getOrElse("format", "carbondata")
val isCarbonFormat = format.equalsIgnoreCase("carbondata") ||
format.equalsIgnoreCase("carbon")
if (!isCarbonFormat) {
newLoadMetaEntry.setFileFormat(new FileFormat(format))
}
val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter() {
override def accept(file: CarbonFile): Boolean = file.getName
.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
})
val updateTimestamp = System.currentTimeMillis().toString
var isUpdateStatusRequired = false
if (deltaFiles.nonEmpty) {
LOGGER.warn("Adding a modified load to the table. If there is any updated segment for this" +
"load, please add updated segment also.")
val blockNameToDeltaFilesMap =
collection.mutable.Map[String, collection.mutable.ListBuffer[(CarbonFile, String)]]()
deltaFiles.foreach { deltaFile =>
val tmpDeltaFilePath = deltaFile.getAbsolutePath
.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
CarbonCommonConstants.FILE_SEPARATOR)
val deltaFilePathElements = tmpDeltaFilePath.split(CarbonCommonConstants.FILE_SEPARATOR)
if (deltaFilePathElements != null && deltaFilePathElements.nonEmpty) {
val deltaFileName = deltaFilePathElements(deltaFilePathElements.length - 1)
val blockName = CarbonTablePath.DataFileUtil
.getBlockNameFromDeleteDeltaFile(deltaFileName)
if (blockNameToDeltaFilesMap.contains(blockName)) {
blockNameToDeltaFilesMap(blockName) += ((deltaFile, deltaFileName))
} else {
val deltaFileList = new ListBuffer[(CarbonFile, String)]()
deltaFileList += ((deltaFile, deltaFileName))
blockNameToDeltaFilesMap.put(blockName, deltaFileList)
}
}
}
val segmentUpdateDetails = new util.ArrayList[SegmentUpdateDetails]()
val columnCompressor = CompressorFactory.getInstance.getCompressor.getName
blockNameToDeltaFilesMap.foreach { entry =>
val segmentUpdateDetail = new SegmentUpdateDetails()
segmentUpdateDetail.setBlockName(entry._1)
segmentUpdateDetail.setActualBlockName(
entry._1 + CarbonCommonConstants.POINT + columnCompressor +
CarbonCommonConstants.FACT_FILE_EXT)
segmentUpdateDetail.setSegmentName(model.getSegmentId)
val blockNameElements = entry._1.split(CarbonCommonConstants.HYPHEN)
if (blockNameElements != null && blockNameElements.nonEmpty) {
val segmentId = blockNameElements(blockNameElements.length - 1)
// Segment ID in cases of SDK is null
if (segmentId.equals("null")) {
readAllDeltaFiles(entry._2, segmentUpdateDetail)
} else {
setValidDeltaFileAndDeletedRowCount(entry._2, segmentUpdateDetail)
}
}
segmentUpdateDetails.add(segmentUpdateDetail)
}
CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails,
carbonTable,
updateTimestamp,
false,
true)
isUpdateStatusRequired = true
newLoadMetaEntry.setUpdateDeltaStartTimestamp(updateTimestamp)
newLoadMetaEntry.setUpdateDeltaEndTimestamp(updateTimestamp)
}
CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, false, updateTimestamp,
isUpdateStatusRequired)
val segment = new Segment(
model.getSegmentId,
SegmentFileStore.genSegmentFileName(
model.getSegmentId,
System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
segmentPath,
new util.HashMap[String, String](options.asJava))
// This event will trigger merge index job, only trigger it if it is carbon file
if (isCarbonFormat) {
CarbonLoaderUtil.mergeIndexFilesInAddLoadSegment(carbonTable,
model.getSegmentId,
segmentPath,
model.getFactTimeStamp.toString)
// clear Block index Cache
SegmentFileStore.clearBlockIndexCache(carbonTable, model.getSegmentId)
}
val writeSegment =
if (isCarbonFormat) {
SegmentFileStore.writeSegmentFile(carbonTable, segment)
} else {
SegmentFileStore.writeSegmentFileForOthers(
carbonTable, segment, partitionSpecOp.orNull, partitionDataFiles.asJava)
}
val success = if (writeSegment) {
SegmentFileStore.updateTableStatusFile(
carbonTable,
model.getSegmentId,
segment.getSegmentFileName,
carbonTable.getCarbonTableIdentifier.getTableId,
new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName),
SegmentStatus.SUCCESS,
model.getLatestTableStatusWriteVersion)
} else {
false
}
CarbonHiveIndexMetadataUtil.updateTableStatusVersion(carbonTable,
sparkSession,
model.getLatestTableStatusWriteVersion)
val postExecutionEvent = if (success) {
val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
new LoadTablePostStatusUpdateEvent(model)
val commitComplete = try {
OperationListenerBus.getInstance()
.fireEvent(loadTablePostStatusUpdateEvent, operationContext)
true
} catch {
case ex: Exception =>
LOGGER.error("Problem while committing indexes", ex)
false
}
commitComplete
} else {
success
}
if (!postExecutionEvent || !success) {
CarbonLoaderUtil.updateTableStatusForFailure(model, "uniqueTableStatusId")
LOGGER.info("********starting clean up**********")
// delete segment is applicable for transactional table
CarbonLoaderUtil.deleteSegmentForFailure(model)
// delete corresponding segment file from metadata
val segmentFile = CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) +
File.separator + segment.getSegmentFileName
FileFactory.deleteFile(segmentFile)
clearIndexFiles(carbonTable, model.getSegmentId)
LOGGER.info("********clean up done**********")
LOGGER.error("Data load failed due to failure in table status update.")
throw new Exception("Data load failed due to failure in table status update.")
}
MVManagerInSpark.disableMVOnTable(sparkSession, carbonTable)
CommonLoadUtils.firePostLoadEvents(sparkSession,
model,
tableIndexes,
indexOperationContext,
carbonTable,
operationContext)
}