def processSIRepair()

in integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala [497:680]


  def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
      carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
      secondaryIndexProvider: String, repairLimit: Int,
      segments: Option[List[String]] = Option.empty,
      isLoadOrCompaction: Boolean = false)(sparkSession: SparkSession): Unit = {
    // when Si creation and load to main table are parallel, get the carbonTable from the
    // metastore which will have the latest index Info
    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
    val indexTable = metaStore
      .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
        sparkSession)
      .asInstanceOf[CarbonRelation]
      .carbonTable

    var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
    val compactionLock = CarbonLockFactory.getCarbonLockObj(
      carbonTable.getAbsoluteTableIdentifier,
      LockUsage.COMPACTION_LOCK)
    try {
      // In some cases, SI table segment might be in COMPACTED state and main table
      // compaction might be still in progress. In those cases, we can try to take compaction lock
      // on main table and then compare and add SI segments to failedLoads, to avoid repair
      // SI SUCCESS loads.
      if (compactionLock.lockWithRetries()) {
        var mainTableDetails = try {
          SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
            carbonTable.getTablePath, carbonTable.getTableStatusVersion))
        } catch {
          case exception: Exception =>
            if (!isLoadOrCompaction) {
              throw exception
            }
            return;
        }
        carbonLoadModel.setLoadMetadataDetails(mainTableDetails.toList.asJava)
        if (segments.isDefined) {
          mainTableDetails = mainTableDetails.filter(
            loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
        }
        val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
          SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath,
            indexTable.getTableStatusVersion)
        if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSiSeg(
          mainTableDetails,
          siTblLoadMetadataDetails)) {
          val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
            indexTableName)
          val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
            indexMetadata.getParentTableName,
            indexColumns.split(",").toList,
            indexTableName)

          // If it empty, then no need to do further computations because the
          // tabletstatus might not have been created and hence next load will take care
          if (siTblLoadMetadataDetails.isEmpty) {
            Seq.empty
          }

          val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
          .ArrayList[LoadMetadataDetails]()

          // read the details of SI table and get all the failed segments during SI
          // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
          siTblLoadMetadataDetails.foreach {
            case loadMetaDetail: LoadMetadataDetails =>
              val isMainTableLoadValid = checkIfMainTableLoadIsValid(mainTableDetails,
                loadMetaDetail.getLoadName)
              if (loadMetaDetail.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE &&
                  isMainTableLoadValid && repairLimit > failedLoadMetadataDetails.size()) {
                failedLoadMetadataDetails.add(loadMetaDetail)
              } else if ((loadMetaDetail.getSegmentStatus ==
                          SegmentStatus.INSERT_IN_PROGRESS ||
                          loadMetaDetail.getSegmentStatus ==
                          SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
                         isMainTableLoadValid && repairLimit > failedLoadMetadataDetails.size()) {
                val segmentLock = CarbonLockFactory
                  .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
                    CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
                    LockUsage.LOCK)
                try {
                  if (segmentLock.lockWithRetries(1, 0)) {
                    LOGGER
                      .info("SIFailedLoadListener: Acquired segment lock on segment:" +
                            loadMetaDetail.getLoadName)
                    failedLoadMetadataDetails.add(loadMetaDetail)
                  }
                } finally {
                  segmentLock.unlock()
                  LOGGER
                    .info("SIFailedLoadListener: Released segment lock on segment:" +
                          loadMetaDetail.getLoadName)
                }
              }
          }

          // check for the skipped segments. compare the main table and SI table table
          // status file and get the skipped segments if any
          CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails).asScala
            .foreach(metadataDetail => {
              if (repairLimit > failedLoadMetadataDetails.size()) {
                val detail = siTblLoadMetadataDetails
                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
                val mainTableDetail = mainTableDetails
                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
                if (null == detail || detail.length == 0) {
                  val newDetails = new LoadMetadataDetails
                  newDetails.setLoadName(metadataDetail)
                  LOGGER.info(
                    "Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
                    " table " + indexTableName + "." + carbonTable.getTableName)
                  failedLoadMetadataDetails.add(newDetails)
                } else if (detail != null && detail.length != 0 && metadataDetail != null
                           && metadataDetail.length != 0) {
                  // If SI table has compacted segments and main table does not have
                  // compacted segments due to some failure while compaction, need to
                  // reload the original segments in this case.
                  if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
                      mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
                    detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
                    // in concurrent scenario, if a compaction is going on table, then SI
                    // segments are updated first in table status and then the main table
                    // segment, so in any load runs parallel this listener shouldn't consider
                    // those segments accidentally. So try to take the segment lock.
                    val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
                      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
                        CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
                        LockUsage.LOCK)
                    if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
                      segmentLocks += segmentLockOfProbableOnCompactionSeg
                      LOGGER.info(
                        "Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
                        + "table " + indexTableName + "." + carbonTable.getTableName)
                      failedLoadMetadataDetails.add(detail(0))
                    }
                  }
                }
              }
            })

          try {
            if (!failedLoadMetadataDetails.isEmpty) {
              // in the case when in SI table it's entry is deleted from the tablestatus file,
              // the corresponding segment folder and .segment file from
              // the metadata folder should also be deleted as it contains the
              // mergefilename which may not exist or is not valid.
              deleteStaleSegmentFileIfPresent(carbonLoadModel,
                indexTable,
                failedLoadMetadataDetails)
              CarbonIndexUtil
                .LoadToSITable(sparkSession,
                  carbonLoadModel,
                  indexTableName,
                  isLoadToFailedSISegments = true,
                  indexModel,
                  carbonTable, indexTable, false, failedLoadMetadataDetails)
            }
          } catch {
            case ex: Exception =>
              // in case of SI load only for for failed segments, catch the exception, but
              // do not fail the main table load, as main table segments should be available
              // for query
              LOGGER.error(s"Load to SI table to $indexTableName is failed " +
                           s"or SI table ENABLE is failed. ", ex)
              Seq.empty
          } finally {
            segmentLocks.foreach {
              segmentLock => segmentLock.unlock()
            }
          }
        }
      } else {
        LOGGER.error(s"Didn't check failed segments for index [$indexTableName] as compaction " +
                     s"is progress on ${ carbonTable.getTableUniqueName }. " +
                     s"Please call SI repair again")
        if (!isLoadOrCompaction) {
          throw new ConcurrentOperationException(carbonTable.getDatabaseName,
            carbonTable.getTableName, "compaction", "reindex command")
        }
      }
    } finally {
      compactionLock.unlock()
    }
    Seq.empty
  }