in integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala [497:680]
def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
secondaryIndexProvider: String, repairLimit: Int,
segments: Option[List[String]] = Option.empty,
isLoadOrCompaction: Boolean = false)(sparkSession: SparkSession): Unit = {
// when Si creation and load to main table are parallel, get the carbonTable from the
// metastore which will have the latest index Info
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
val indexTable = metaStore
.lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
sparkSession)
.asInstanceOf[CarbonRelation]
.carbonTable
var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
val compactionLock = CarbonLockFactory.getCarbonLockObj(
carbonTable.getAbsoluteTableIdentifier,
LockUsage.COMPACTION_LOCK)
try {
// In some cases, SI table segment might be in COMPACTED state and main table
// compaction might be still in progress. In those cases, we can try to take compaction lock
// on main table and then compare and add SI segments to failedLoads, to avoid repair
// SI SUCCESS loads.
if (compactionLock.lockWithRetries()) {
var mainTableDetails = try {
SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
carbonTable.getTablePath, carbonTable.getTableStatusVersion))
} catch {
case exception: Exception =>
if (!isLoadOrCompaction) {
throw exception
}
return;
}
carbonLoadModel.setLoadMetadataDetails(mainTableDetails.toList.asJava)
if (segments.isDefined) {
mainTableDetails = mainTableDetails.filter(
loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
}
val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath,
indexTable.getTableStatusVersion)
if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSiSeg(
mainTableDetails,
siTblLoadMetadataDetails)) {
val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
indexTableName)
val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
indexMetadata.getParentTableName,
indexColumns.split(",").toList,
indexTableName)
// If it empty, then no need to do further computations because the
// tabletstatus might not have been created and hence next load will take care
if (siTblLoadMetadataDetails.isEmpty) {
Seq.empty
}
val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
.ArrayList[LoadMetadataDetails]()
// read the details of SI table and get all the failed segments during SI
// creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
siTblLoadMetadataDetails.foreach {
case loadMetaDetail: LoadMetadataDetails =>
val isMainTableLoadValid = checkIfMainTableLoadIsValid(mainTableDetails,
loadMetaDetail.getLoadName)
if (loadMetaDetail.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE &&
isMainTableLoadValid && repairLimit > failedLoadMetadataDetails.size()) {
failedLoadMetadataDetails.add(loadMetaDetail)
} else if ((loadMetaDetail.getSegmentStatus ==
SegmentStatus.INSERT_IN_PROGRESS ||
loadMetaDetail.getSegmentStatus ==
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
isMainTableLoadValid && repairLimit > failedLoadMetadataDetails.size()) {
val segmentLock = CarbonLockFactory
.getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
LockUsage.LOCK)
try {
if (segmentLock.lockWithRetries(1, 0)) {
LOGGER
.info("SIFailedLoadListener: Acquired segment lock on segment:" +
loadMetaDetail.getLoadName)
failedLoadMetadataDetails.add(loadMetaDetail)
}
} finally {
segmentLock.unlock()
LOGGER
.info("SIFailedLoadListener: Released segment lock on segment:" +
loadMetaDetail.getLoadName)
}
}
}
// check for the skipped segments. compare the main table and SI table table
// status file and get the skipped segments if any
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails).asScala
.foreach(metadataDetail => {
if (repairLimit > failedLoadMetadataDetails.size()) {
val detail = siTblLoadMetadataDetails
.filter(metadata => metadata.getLoadName.equals(metadataDetail))
val mainTableDetail = mainTableDetails
.filter(metadata => metadata.getLoadName.equals(metadataDetail))
if (null == detail || detail.length == 0) {
val newDetails = new LoadMetadataDetails
newDetails.setLoadName(metadataDetail)
LOGGER.info(
"Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
" table " + indexTableName + "." + carbonTable.getTableName)
failedLoadMetadataDetails.add(newDetails)
} else if (detail != null && detail.length != 0 && metadataDetail != null
&& metadataDetail.length != 0) {
// If SI table has compacted segments and main table does not have
// compacted segments due to some failure while compaction, need to
// reload the original segments in this case.
if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
// in concurrent scenario, if a compaction is going on table, then SI
// segments are updated first in table status and then the main table
// segment, so in any load runs parallel this listener shouldn't consider
// those segments accidentally. So try to take the segment lock.
val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
LockUsage.LOCK)
if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
segmentLocks += segmentLockOfProbableOnCompactionSeg
LOGGER.info(
"Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
+ "table " + indexTableName + "." + carbonTable.getTableName)
failedLoadMetadataDetails.add(detail(0))
}
}
}
}
})
try {
if (!failedLoadMetadataDetails.isEmpty) {
// in the case when in SI table it's entry is deleted from the tablestatus file,
// the corresponding segment folder and .segment file from
// the metadata folder should also be deleted as it contains the
// mergefilename which may not exist or is not valid.
deleteStaleSegmentFileIfPresent(carbonLoadModel,
indexTable,
failedLoadMetadataDetails)
CarbonIndexUtil
.LoadToSITable(sparkSession,
carbonLoadModel,
indexTableName,
isLoadToFailedSISegments = true,
indexModel,
carbonTable, indexTable, false, failedLoadMetadataDetails)
}
} catch {
case ex: Exception =>
// in case of SI load only for for failed segments, catch the exception, but
// do not fail the main table load, as main table segments should be available
// for query
LOGGER.error(s"Load to SI table to $indexTableName is failed " +
s"or SI table ENABLE is failed. ", ex)
Seq.empty
} finally {
segmentLocks.foreach {
segmentLock => segmentLock.unlock()
}
}
}
} else {
LOGGER.error(s"Didn't check failed segments for index [$indexTableName] as compaction " +
s"is progress on ${ carbonTable.getTableUniqueName }. " +
s"Please call SI repair again")
if (!isLoadOrCompaction) {
throw new ConcurrentOperationException(carbonTable.getDatabaseName,
carbonTable.getTableName, "compaction", "reindex command")
}
}
} finally {
compactionLock.unlock()
}
Seq.empty
}