in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala [283:568]
def loadCarbonData(
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
partitionStatus: SegmentStatus = SegmentStatus.SUCCESS,
overwriteTable: Boolean,
hadoopConf: Configuration,
dataFrame: Option[DataFrame] = None,
scanResultRdd : Option[RDD[InternalRow]] = None,
updateModel: Option[UpdateTableModel] = None,
operationContext: OperationContext): LoadMetadataDetails = {
// Check if any load need to be deleted before loading new data
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
// accumulator to collect segment metadata
val segmentMetaDataAccumulator = sqlContext
.sparkContext
.collectionAccumulator[Map[String, SegmentMetaDataInfo]]
// create new segment folder in carbon store
if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable ||
updateModel.isDefined) {
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable)
}
var loadStatus = SegmentStatus.SUCCESS
var errorMessage: String = "DataLoad failure"
var executorMessage: String = ""
val isSortTable = carbonTable.getNumberOfSortColumns > 0
val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)
// dataFrame.get.rdd.isEmpty() will launch a job, so avoid calling it multiple times
val isEmptyDataframe = updateModel.isDefined && dataFrame.get.rdd.isEmpty()
try {
if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) {
if (isEmptyDataframe) {
// if the rowToBeUpdated is empty, mark created segment as marked for delete and return
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, "")
} else {
status = if (scanResultRdd.isDefined) {
val colSchema = carbonLoadModel
.getCarbonDataLoadSchema
.getCarbonTable
.getTableInfo
.getFactTable
.getListOfColumns
.asScala
.filterNot(col => col.isInvisible || col.isComplexColumn)
val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
colSchema,
scanResultRdd.get,
isGlobalSortPartition = false)
if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) &&
!carbonLoadModel.isNonSchemaColumnsPresent) {
DataLoadProcessBuilderOnSpark.insertDataUsingGlobalSortWithInternalRow(sqlContext
.sparkSession,
convertedRdd,
carbonLoadModel,
hadoopConf,
segmentMetaDataAccumulator)
} else if (sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
loadDataFrameForNoSort(sqlContext,
None,
Some(convertedRdd),
carbonLoadModel,
segmentMetaDataAccumulator)
} else {
loadDataFrame(sqlContext,
None,
Some(convertedRdd),
carbonLoadModel,
segmentMetaDataAccumulator)
}
} else {
if (dataFrame.isEmpty && isSortTable &&
carbonLoadModel.getRangePartitionColumn != null &&
(sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) ||
sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) {
DataLoadProcessBuilderOnSpark
.loadDataUsingRangeSort(sqlContext.sparkSession,
carbonLoadModel,
hadoopConf,
segmentMetaDataAccumulator)
} else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
dataFrame,
carbonLoadModel,
hadoopConf,
segmentMetaDataAccumulator)
} else if (dataFrame.isDefined) {
loadDataFrame(sqlContext,
dataFrame,
None,
carbonLoadModel,
segmentMetaDataAccumulator)
} else {
loadDataFile(sqlContext, carbonLoadModel, hadoopConf, segmentMetaDataAccumulator)
}
}
val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
if (status.nonEmpty) {
status.foreach { eachLoadStatus =>
val state = newStatusMap.get(eachLoadStatus._1)
state match {
case Some(SegmentStatus.LOAD_FAILURE) =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
if eachLoadStatus._2._1.getSegmentStatus ==
SegmentStatus.SUCCESS =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
case _ =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
}
}
newStatusMap.foreach {
case (key, value) =>
if (value == SegmentStatus.LOAD_FAILURE) {
loadStatus = SegmentStatus.LOAD_FAILURE
} else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
loadStatus != SegmentStatus.LOAD_FAILURE) {
loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
}
}
} else {
// if no value is there in data load, make load status Success
// and data load flow executes
if ((dataFrame.isDefined || scanResultRdd.isDefined) && updateModel.isEmpty) {
if (dataFrame.isDefined) {
val rdd = dataFrame.get.rdd
if (rdd.partitions == null || rdd.partitions.length == 0) {
LOGGER.warn("DataLoading finished. No data was loaded.")
loadStatus = SegmentStatus.SUCCESS
}
} else {
if (scanResultRdd.get.partitions == null ||
scanResultRdd.get.partitions.length == 0) {
LOGGER.warn("DataLoading finished. No data was loaded.")
loadStatus = SegmentStatus.SUCCESS
}
}
} else {
loadStatus = SegmentStatus.LOAD_FAILURE
}
}
if (loadStatus != SegmentStatus.LOAD_FAILURE &&
partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
loadStatus = partitionStatus
}
}
}
} catch {
case ex: Throwable =>
loadStatus = SegmentStatus.LOAD_FAILURE
val (extrMsgLocal, errorMsgLocal) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER)
executorMessage = extrMsgLocal
errorMessage = errorMsgLocal
LOGGER.info(errorMessage)
LOGGER.error(ex)
}
var isLoadingCommitted = false
try {
val uniqueTableStatusId = Option(operationContext.getProperty("uuid")).getOrElse("")
.asInstanceOf[String]
if (loadStatus == SegmentStatus.LOAD_FAILURE) {
// update the load entry in table status file for changing the status to marked for delete
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
LOGGER.info("********starting clean up**********")
if (carbonLoadModel.isCarbonTransactionalTable) {
// delete segment is applicable for transactional table
CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
}
LOGGER.info("********clean up done**********")
LOGGER.warn("Cannot write load metadata file as data load failed")
throw new Exception(errorMessage)
} else {
// check if data load fails due to bad record and throw data load failure due to
// bad record exception
if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
// update the load entry in table status file for changing the status to marked for delete
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
LOGGER.info("********starting clean up**********")
if (carbonLoadModel.isCarbonTransactionalTable) {
// delete segment is applicable for transactional table
CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
}
LOGGER.info("********clean up done**********")
throw new Exception(status(0)._2._2.errorMsg)
}
if (isEmptyDataframe) {
return null
}
// as no record loaded in new segment, new segment should be deleted
val newEntryLoadStatus =
if (carbonLoadModel.isCarbonTransactionalTable &&
!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV &&
!CarbonLoaderUtil.isValidSegment(carbonLoadModel,
carbonLoadModel.getSegmentId.toInt)) {
LOGGER.warn("Cannot write load metadata file as there is no data to load")
SegmentStatus.MARKED_FOR_DELETE
} else {
loadStatus
}
val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
carbonLoadModel.getSegmentId,
segmentMetaDataAccumulator)
segmentMetaDataAccumulator.reset()
operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
carbonLoadModel.getSegmentId)
val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
new LoadTablePreStatusUpdateEvent(
carbonTable.getCarbonTableIdentifier,
carbonLoadModel)
OperationListenerBus.getInstance()
.fireEvent(loadTablePreStatusUpdateEvent, operationContext)
val segmentFileName =
SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
val (done, writtenSegment) =
updateTableStatus(
sqlContext.sparkSession,
status,
carbonLoadModel,
newEntryLoadStatus,
overwriteTable,
segmentFileName,
updateModel,
uniqueTableStatusId)
val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
new LoadTablePostStatusUpdateEvent(carbonLoadModel)
val commitComplete = try {
OperationListenerBus.getInstance()
.fireEvent(loadTablePostStatusUpdateEvent, operationContext)
true
} catch {
case ex: Exception =>
LOGGER.error("Problem while committing indexes", ex)
false
}
if (!done || !commitComplete) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
LOGGER.info("********starting clean up**********")
if (carbonLoadModel.isCarbonTransactionalTable) {
// delete segment is applicable for transactional table
CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
// delete corresponding segment file from metadata
val segmentFile =
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
File.separator + segmentFileName
FileFactory.deleteFile(segmentFile)
clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
}
LOGGER.info("********clean up done**********")
LOGGER.error("Data load failed due to failure in table status update.")
throw new Exception("Data load failed due to failure in table status update.")
}
if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
LOGGER.info("Data load is partially successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
} else {
LOGGER.info("Data load is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
isLoadingCommitted = true
writtenSegment
}
} finally {
// Release the segment lock, once table status is finally updated
segmentLock.unlock()
if (isLoadingCommitted) {
triggerEventsAfterLoading(sqlContext,
carbonLoadModel,
hadoopConf,
operationContext,
updateModel.isDefined)
}
}
}