in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala [66:490]
def createSecondaryIndex(secondaryIndexModel: SecondaryIndexModel,
segmentToLoadStartTimeMap: java.util.Map[String, String],
indexTable: CarbonTable,
forceAccessSegment: Boolean = false,
isCompactionCall: Boolean,
isLoadToFailedSISegments: Boolean,
isInsertOverwrite: Boolean = false):
(CarbonTable, ListBuffer[ICarbonLock], OperationContext) = {
var indexCarbonTable = indexTable
val sc = secondaryIndexModel.sqlContext
// get the thread pool size for secondary index creation
val threadPoolSize = getThreadPoolSize(sc)
LOGGER
.info(s"Configured thread pool size for distributing segments in secondary index creation " +
s"is $threadPoolSize")
// create executor service to parallel run the segments
val executorService = java.util.concurrent.Executors.newFixedThreadPool(threadPoolSize)
if (null == indexCarbonTable) {
// avoid more lookupRelation to table
val metastore = CarbonEnv.getInstance(secondaryIndexModel.sqlContext.sparkSession)
.carbonMetaStore
indexCarbonTable = metastore
.lookupRelation(Some(secondaryIndexModel.carbonLoadModel.getDatabaseName),
secondaryIndexModel.secondaryIndex.indexName)(secondaryIndexModel.sqlContext
.sparkSession).carbonTable
}
val operationContext = new OperationContext
val loadTableSIPreExecutionEvent: LoadTableSIPreExecutionEvent =
LoadTableSIPreExecutionEvent(secondaryIndexModel.sqlContext.sparkSession,
new CarbonTableIdentifier(indexCarbonTable.getDatabaseName,
indexCarbonTable.getTableName, ""),
secondaryIndexModel.carbonLoadModel,
indexCarbonTable)
OperationListenerBus.getInstance
.fireEvent(loadTableSIPreExecutionEvent, operationContext)
var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
val validSegments: java.util.List[String] = new util.ArrayList[String]()
val skippedSegments: java.util.List[String] = new util.ArrayList[String]()
var validSegmentList = List.empty[String]
try {
for (eachSegment <- secondaryIndexModel.validSegments) {
// take segment lock before starting the actual load, so that the clearing the invalid
// segments in the parallel load to SI will not clear the current valid loading segment
// and this new segment will not be added as failed segment in
// SILoadEventListenerForFailedSegments as the failed segments list
// is validated based on the segment lock
val segmentLock = CarbonLockFactory
.getCarbonLockObj(indexCarbonTable.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(eachSegment) + LockUsage.LOCK)
if (segmentLock.lockWithRetries(1, 0)) {
segmentLocks += segmentLock
// add only the segments for which we are able to get segments lock and trigger
// loading for these segments. if some segments are skipped,
// skipped segments load will be handled in SILoadEventListenerForFailedSegments
validSegments.add(eachSegment)
} else {
skippedSegments.add(eachSegment)
LOGGER.error(s"Not able to acquire the segment lock for table" +
s" ${indexCarbonTable.getTableUniqueName} for segment: $eachSegment. " +
s"Skipping this segment from loading.")
}
}
validSegmentList = validSegments.asScala.toList
if (validSegmentList.isEmpty) {
return (indexCarbonTable, segmentLocks, operationContext)
}
LOGGER.info(s"${indexCarbonTable.getTableUniqueName}: SI loading is started " +
s"for segments: $validSegmentList")
var segmentStatus = if (isInsertOverwrite) {
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
} else {
SegmentStatus.INSERT_IN_PROGRESS
}
FileInternalUtil
.updateTableStatus(validSegmentList,
secondaryIndexModel.carbonLoadModel,
secondaryIndexModel.secondaryIndex.indexName,
segmentStatus,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
new java.util
.HashMap[String,
String](),
indexCarbonTable,
sc.sparkSession)
CarbonHiveIndexMetadataUtil.updateTableStatusVersion(indexCarbonTable,
sc.sparkSession,
secondaryIndexModel.carbonLoadModel.getLatestTableStatusWriteVersion)
var execInstance = "1"
// in case of non dynamic executor allocation, number of executors are fixed.
if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
LOGGER.info("spark.executor.instances property is set to =" + execInstance)
}
// in case of dynamic executor allocation, taking the max executors
// of the dynamic allocation.
else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
.equalsIgnoreCase("true")) {
execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
LOGGER.info("spark.dynamicAllocation.maxExecutors property is set to =" + execInstance)
}
}
var successSISegments: List[String] = List()
var failedSISegments: List[String] = List()
val carbonLoadModel: CarbonLoadModel = getCopyObject(secondaryIndexModel)
val sort_scope = indexCarbonTable.getTableInfo.getFactTable.getTableProperties
.get("sort_scope")
if (sort_scope != null && sort_scope.equalsIgnoreCase("global_sort")) {
val mainTable = secondaryIndexModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
var futureObjectList = List[java.util.concurrent.Future[Array[(String,
(LoadMetadataDetails, ExecutionErrors))]]]()
for (eachSegment <- validSegmentList) {
futureObjectList :+= executorService
.submit(new Callable[Array[(String, (LoadMetadataDetails, ExecutionErrors))]] {
@throws(classOf[Exception])
override def call(): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
// to load as a global sort SI segment,
// we need to query main table along with position reference projection
val projections = indexCarbonTable.getCreateOrderColumn
.asScala
.map(_.getColName)
.filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
val explodeColumn = mainTable.getCreateOrderColumn.asScala
.filter(x => x.getDataType.isComplexType &&
projections.contains(x.getColName))
// At this point we are getting SI columns data from main table, so it is required
// to calculate positionReference. Because SI has SI columns + positionReference.
var dataFrame = dataFrameOfSegments(sc.sparkSession,
mainTable,
projections.mkString(","),
Array(eachSegment),
isPositionReferenceRequired = true, !explodeColumn.isEmpty)
// flatten the complex SI
if (explodeColumn.nonEmpty) {
val columns = dataFrame.schema.map { x =>
if (x.name.equals(explodeColumn.head.getColName)) {
functions.explode_outer(functions.col(x.name))
} else {
functions.col(x.name)
}
}
dataFrame = dataFrame.select(columns: _*)
}
val dataLoadSchema = new CarbonDataLoadSchema(indexCarbonTable)
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
carbonLoadModel.setTableName(indexCarbonTable.getTableName)
carbonLoadModel.setDatabaseName(indexCarbonTable.getDatabaseName)
carbonLoadModel.setTablePath(indexCarbonTable.getTablePath)
carbonLoadModel.setFactTimeStamp(secondaryIndexModel
.segmentIdToLoadStartTimeMapping(eachSegment))
carbonLoadModel.setSegmentId(eachSegment)
var result: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
try {
val configuration = FileFactory.getConfiguration
configuration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSegment)
val currentSegmentFileName = if (mainTable.isHivePartitionTable) {
eachSegment + CarbonCommonConstants.UNDERSCORE +
carbonLoadModel.getFactTimeStamp
} else {
null
}
findCarbonScanRDD(dataFrame.rdd, currentSegmentFileName)
// accumulator to collect segment metadata
val segmentMetaDataAccumulator = sc.sparkSession.sqlContext
.sparkContext
.collectionAccumulator[Map[String, SegmentMetaDataInfo]]
// TODO: use new insert into flow, instead of DataFrame prepare RDD[InternalRow]
result = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
sc.sparkSession,
Some(dataFrame),
carbonLoadModel,
hadoopConf = configuration, segmentMetaDataAccumulator)
}
segmentToLoadStartTimeMap
.put(eachSegment, String.valueOf(carbonLoadModel.getFactTimeStamp))
result
}
})
}
val segmentSecondaryIndexCreationStatus = futureObjectList.filter(_.get().length > 0)
.groupBy(a => a.get().head._2._1.getSegmentStatus)
val hasSuccessSegments =
segmentSecondaryIndexCreationStatus.contains(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
segmentSecondaryIndexCreationStatus.contains(SegmentStatus.SUCCESS)
val hasFailedSegments = segmentSecondaryIndexCreationStatus
.contains(SegmentStatus.MARKED_FOR_DELETE)
if (hasSuccessSegments) {
successSISegments =
segmentSecondaryIndexCreationStatus(SegmentStatus.SUCCESS).collect {
case segments: java.util.concurrent.Future[Array[(String, (LoadMetadataDetails,
ExecutionErrors))]] =>
segments.get().head._2._1.getLoadName
}
}
if (hasFailedSegments) {
// if the call is from compaction, we need to fail the main table compaction also, and if
// the load is called from SIloadEventListener, which is for corresponding main table
// segment, then if SI load fails, we need to fail main table load also, so throw
// exception,
// if load is called from SI creation or SILoadEventListenerForFailedSegments, no need to
// fail, just make the segement as marked for delete, so that next load to main table will
// take care
if (isCompactionCall || !isLoadToFailedSISegments) {
throw new Exception("Secondary index creation failed")
} else {
failedSISegments =
segmentSecondaryIndexCreationStatus(SegmentStatus.MARKED_FOR_DELETE).collect {
case segments: java.util.concurrent.Future[Array[(String, (LoadMetadataDetails,
ExecutionErrors))]] =>
segments.get().head._1
}
}
}
} else {
var futureObjectList = List[java.util.concurrent.Future[Array[(String, Boolean)]]]()
for (eachSegment <- validSegmentList) {
val segId = eachSegment
futureObjectList :+= executorService.submit(new Callable[Array[(String, Boolean)]] {
@throws(classOf[Exception])
override def call(): Array[(String, Boolean)] = {
ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo
.put("carbonConf", SparkSQLUtil.sessionState(sc.sparkSession).newHadoopConf())
var eachSegmentSecondaryIndexCreationStatus: Array[(String, Boolean)] = Array.empty
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(segId, indexCarbonTable)
carbonLoadModel
.setFactTimeStamp(secondaryIndexModel.segmentIdToLoadStartTimeMapping(eachSegment))
carbonLoadModel.setTablePath(secondaryIndexModel.carbonTable.getTablePath)
val secondaryIndexCreationStatus = new CarbonSecondaryIndexRDD(sc.sparkSession,
new SecondaryIndexCreationResultImpl,
carbonLoadModel,
secondaryIndexModel.secondaryIndex,
segId, execInstance, indexCarbonTable, forceAccessSegment).collect()
segmentToLoadStartTimeMap.put(segId, carbonLoadModel.getFactTimeStamp.toString)
if (secondaryIndexCreationStatus.length > 0) {
eachSegmentSecondaryIndexCreationStatus = secondaryIndexCreationStatus
}
eachSegmentSecondaryIndexCreationStatus
}
})
}
val segmentSecondaryIndexCreationStatus = futureObjectList.filter(_.get().length > 0)
.groupBy(a => a.get().head._2)
val hasSuccessSegments = segmentSecondaryIndexCreationStatus.contains("true".toBoolean)
val hasFailedSegments = segmentSecondaryIndexCreationStatus.contains("false".toBoolean)
if (hasSuccessSegments) {
successSISegments =
segmentSecondaryIndexCreationStatus("true".toBoolean).collect {
case segments: java.util.concurrent.Future[Array[(String, Boolean)]] =>
segments.get().head._1
}
}
if (hasFailedSegments) {
// if the call is from compaction, we need to fail the main table compaction also, and if
// the load is called from SIloadEventListener, which is for corresponding main table
// segment, then if SI load fails, we need to fail main table load also, so throw
// exception,
// if load is called from SI creation or SILoadEventListenerForFailedSegments, no need to
// fail, just make the segement as marked for delete, so that next load to main table will
// take care
if (isCompactionCall || !isLoadToFailedSISegments) {
throw new Exception("Secondary index creation failed")
} else {
failedSISegments =
segmentSecondaryIndexCreationStatus("false".toBoolean).collect {
case segments: java.util.concurrent.Future[Array[(String, Boolean)]] =>
segments.get().head._1
}
}
}
}
// what and all segments the load failed, only for those need make status as marked
// for delete, remaining let them be SUCCESS
var tableStatusUpdateForSuccess = false
var tableStatusUpdateForFailure = false
if (successSISegments.nonEmpty && !isCompactionCall) {
// merge index files for success segments in case of only load
CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
successSISegments,
segmentToLoadStartTimeMap,
indexCarbonTable.getTablePath,
indexCarbonTable, mergeIndexProperty = false)
val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
indexCarbonTable.getMetadataPath, indexCarbonTable.getTableStatusVersion)
.filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))
val carbonLoadModelForMergeDataFiles = SecondaryIndexUtil
.getCarbonLoadModel(indexCarbonTable,
loadMetadataDetails.toList.asJava,
System.currentTimeMillis(),
CarbonIndexUtil
.getCompressorForIndexTable(indexCarbonTable, secondaryIndexModel.carbonTable))
carbonLoadModelForMergeDataFiles.setLatestTableStatusWriteVersion(secondaryIndexModel
.carbonLoadModel.getLatestTableStatusWriteVersion)
// merge the data files of the loaded segments and take care of
// merging the index files inside this if needed
val rebuiltSegments = SecondaryIndexUtil
.mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping,
indexCarbonTable,
loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc)
if (isInsertOverwrite) {
val overriddenSegments = SegmentStatusManager.readLoadMetadata(
indexCarbonTable.getMetadataPath, indexCarbonTable.getTableStatusVersion)
.filter(loadMetadata => !successSISegments.contains(loadMetadata.getLoadName))
.map(_.getLoadName).toList
FileInternalUtil
.updateTableStatus(
overriddenSegments,
secondaryIndexModel.carbonLoadModel,
secondaryIndexModel.secondaryIndex.indexName,
SegmentStatus.MARKED_FOR_DELETE,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
segmentToLoadStartTimeMap,
indexTable,
secondaryIndexModel.sqlContext.sparkSession)
}
if (rebuiltSegments.isEmpty) {
for (loadMetadata <- loadMetadataDetails) {
SegmentFileStore
.writeSegmentFile(indexCarbonTable, loadMetadata.getLoadName,
String.valueOf(loadMetadata.getLoadStartTime))
}
tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
successSISegments,
secondaryIndexModel.carbonLoadModel,
secondaryIndexModel.secondaryIndex.indexName,
SegmentStatus.SUCCESS,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
segmentToLoadStartTimeMap,
indexCarbonTable,
secondaryIndexModel.sqlContext.sparkSession,
carbonLoadModelForMergeDataFiles.getFactTimeStamp,
rebuiltSegments)
}
}
if (!isCompactionCall) {
// Index PrePriming for SI
DistributedRDDUtils.triggerPrepriming(secondaryIndexModel.sqlContext.sparkSession,
indexCarbonTable,
Seq(),
operationContext,
FileFactory.getConfiguration,
validSegments.asScala.toList)
}
// update the status of all the segments to marked for delete if data load fails, so that
// next load which is triggered for SI table in post event of main table data load clears
// all the segments of marked for delete and re-triggers the load to same segments again in
// that event
if (failedSISegments.nonEmpty && !isCompactionCall) {
tableStatusUpdateForFailure = FileInternalUtil.updateTableStatus(
failedSISegments,
secondaryIndexModel.carbonLoadModel,
secondaryIndexModel.secondaryIndex.indexName,
SegmentStatus.MARKED_FOR_DELETE,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
segmentToLoadStartTimeMap,
indexCarbonTable,
secondaryIndexModel.sqlContext.sparkSession)
}
if (failedSISegments.nonEmpty) {
LOGGER.error("Dataload to secondary index creation has failed")
}
CarbonHiveIndexMetadataUtil.updateTableStatusVersion(indexCarbonTable,
secondaryIndexModel.sqlContext.sparkSession,
secondaryIndexModel.carbonLoadModel.getLatestTableStatusWriteVersion)
if (!isCompactionCall) {
val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
LoadTableSIPostExecutionEvent(sc.sparkSession,
indexCarbonTable.getCarbonTableIdentifier,
secondaryIndexModel.carbonLoadModel,
indexCarbonTable)
OperationListenerBus.getInstance
.fireEvent(loadTableSIPostExecutionEvent, operationContext)
}
if (isCompactionCall) {
(indexCarbonTable, segmentLocks, operationContext)
} else {
(indexCarbonTable, ListBuffer.empty, operationContext)
}
} catch {
case ex: Exception =>
LOGGER.error("Load to SI table failed", ex)
if (isCompactionCall) {
segmentLocks.foreach(segmentLock => segmentLock.unlock())
}
FileInternalUtil
.updateTableStatus(validSegmentList,
secondaryIndexModel.carbonLoadModel,
secondaryIndexModel.secondaryIndex.indexName,
SegmentStatus.MARKED_FOR_DELETE,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
new java.util
.HashMap[String,
String](),
indexCarbonTable,
sc.sparkSession)
throw ex
} finally {
// close the executor service
if (null != executorService) {
executorService.shutdownNow()
}
// release the segment locks only for load flow
if (!isCompactionCall) {
segmentLocks.foreach(segmentLock => {
segmentLock.unlock()
})
}
}
}