def createSecondaryIndex()

in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala [66:490]


  def createSecondaryIndex(secondaryIndexModel: SecondaryIndexModel,
    segmentToLoadStartTimeMap: java.util.Map[String, String],
    indexTable: CarbonTable,
    forceAccessSegment: Boolean = false,
    isCompactionCall: Boolean,
    isLoadToFailedSISegments: Boolean,
    isInsertOverwrite: Boolean = false):
  (CarbonTable, ListBuffer[ICarbonLock], OperationContext) = {
    var indexCarbonTable = indexTable
    val sc = secondaryIndexModel.sqlContext
    // get the thread pool size for secondary index creation
    val threadPoolSize = getThreadPoolSize(sc)
    LOGGER
      .info(s"Configured thread pool size for distributing segments in secondary index creation " +
            s"is $threadPoolSize")
    // create executor service to parallel run the segments
    val executorService = java.util.concurrent.Executors.newFixedThreadPool(threadPoolSize)
    if (null == indexCarbonTable) {
      // avoid more lookupRelation to table
      val metastore = CarbonEnv.getInstance(secondaryIndexModel.sqlContext.sparkSession)
        .carbonMetaStore
      indexCarbonTable = metastore
        .lookupRelation(Some(secondaryIndexModel.carbonLoadModel.getDatabaseName),
          secondaryIndexModel.secondaryIndex.indexName)(secondaryIndexModel.sqlContext
          .sparkSession).carbonTable
    }

    val operationContext = new OperationContext
    val loadTableSIPreExecutionEvent: LoadTableSIPreExecutionEvent =
      LoadTableSIPreExecutionEvent(secondaryIndexModel.sqlContext.sparkSession,
        new CarbonTableIdentifier(indexCarbonTable.getDatabaseName,
          indexCarbonTable.getTableName, ""),
        secondaryIndexModel.carbonLoadModel,
        indexCarbonTable)
    OperationListenerBus.getInstance
      .fireEvent(loadTableSIPreExecutionEvent, operationContext)

    var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
    val validSegments: java.util.List[String] = new util.ArrayList[String]()
    val skippedSegments: java.util.List[String] = new util.ArrayList[String]()
    var validSegmentList = List.empty[String]

    try {
      for (eachSegment <- secondaryIndexModel.validSegments) {
        // take segment lock before starting the actual load, so that the clearing the invalid
        // segments in the parallel load to SI will not clear the current valid loading segment
        // and this new segment will not be added as failed segment in
        // SILoadEventListenerForFailedSegments as the failed segments list
        // is validated based on the segment lock
        val segmentLock = CarbonLockFactory
          .getCarbonLockObj(indexCarbonTable.getAbsoluteTableIdentifier,
            CarbonTablePath.addSegmentPrefix(eachSegment) + LockUsage.LOCK)
        if (segmentLock.lockWithRetries(1, 0)) {
          segmentLocks += segmentLock
          // add only the segments for which we are able to get segments lock and trigger
          // loading for these segments. if some segments are skipped,
          // skipped segments load will be handled in SILoadEventListenerForFailedSegments
          validSegments.add(eachSegment)
        } else {
          skippedSegments.add(eachSegment)
          LOGGER.error(s"Not able to acquire the segment lock for table" +
                       s" ${indexCarbonTable.getTableUniqueName} for segment: $eachSegment. " +
                       s"Skipping this segment from loading.")
        }
      }

      validSegmentList = validSegments.asScala.toList
      if (validSegmentList.isEmpty) {
        return (indexCarbonTable, segmentLocks, operationContext)
      }

      LOGGER.info(s"${indexCarbonTable.getTableUniqueName}: SI loading is started " +
              s"for segments: $validSegmentList")
      var segmentStatus = if (isInsertOverwrite) {
        SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
      } else {
        SegmentStatus.INSERT_IN_PROGRESS
      }
      FileInternalUtil
        .updateTableStatus(validSegmentList,
          secondaryIndexModel.carbonLoadModel,
          secondaryIndexModel.secondaryIndex.indexName,
          segmentStatus,
          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
          new java.util
          .HashMap[String,
            String](),
          indexCarbonTable,
          sc.sparkSession)
      CarbonHiveIndexMetadataUtil.updateTableStatusVersion(indexCarbonTable,
        sc.sparkSession,
        secondaryIndexModel.carbonLoadModel.getLatestTableStatusWriteVersion)
      var execInstance = "1"
      // in case of non dynamic executor allocation, number of executors are fixed.
      if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
        execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
        LOGGER.info("spark.executor.instances property is set to =" + execInstance)
      }
      // in case of dynamic executor allocation, taking the max executors
      // of the dynamic allocation.
      else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
        if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
          .equalsIgnoreCase("true")) {
          execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
          LOGGER.info("spark.dynamicAllocation.maxExecutors property is set to =" + execInstance)
        }
      }
      var successSISegments: List[String] = List()
      var failedSISegments: List[String] = List()
      val carbonLoadModel: CarbonLoadModel = getCopyObject(secondaryIndexModel)
      val sort_scope = indexCarbonTable.getTableInfo.getFactTable.getTableProperties
        .get("sort_scope")
      if (sort_scope != null && sort_scope.equalsIgnoreCase("global_sort")) {
        val mainTable = secondaryIndexModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
        var futureObjectList = List[java.util.concurrent.Future[Array[(String,
          (LoadMetadataDetails, ExecutionErrors))]]]()
        for (eachSegment <- validSegmentList) {
          futureObjectList :+= executorService
            .submit(new Callable[Array[(String, (LoadMetadataDetails, ExecutionErrors))]] {
              @throws(classOf[Exception])
              override def call(): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
                // to load as a global sort SI segment,
                // we need to query main table along with position reference projection
                val projections = indexCarbonTable.getCreateOrderColumn
                  .asScala
                  .map(_.getColName)
                  .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
                val explodeColumn = mainTable.getCreateOrderColumn.asScala
                  .filter(x => x.getDataType.isComplexType &&
                               projections.contains(x.getColName))
                // At this point we are getting SI columns data from main table, so it is required
                // to calculate positionReference. Because SI has SI columns + positionReference.
                var dataFrame = dataFrameOfSegments(sc.sparkSession,
                  mainTable,
                  projections.mkString(","),
                  Array(eachSegment),
                  isPositionReferenceRequired = true, !explodeColumn.isEmpty)
                // flatten the complex SI
                if (explodeColumn.nonEmpty) {
                  val columns = dataFrame.schema.map { x =>
                    if (x.name.equals(explodeColumn.head.getColName)) {
                      functions.explode_outer(functions.col(x.name))
                    } else {
                      functions.col(x.name)
                    }
                  }
                  dataFrame = dataFrame.select(columns: _*)
                }
                val dataLoadSchema = new CarbonDataLoadSchema(indexCarbonTable)
                carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
                carbonLoadModel.setTableName(indexCarbonTable.getTableName)
                carbonLoadModel.setDatabaseName(indexCarbonTable.getDatabaseName)
                carbonLoadModel.setTablePath(indexCarbonTable.getTablePath)
                carbonLoadModel.setFactTimeStamp(secondaryIndexModel
                  .segmentIdToLoadStartTimeMapping(eachSegment))
                carbonLoadModel.setSegmentId(eachSegment)
                var result: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
                try {
                  val configuration = FileFactory.getConfiguration
                  configuration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSegment)
                  val currentSegmentFileName = if (mainTable.isHivePartitionTable) {
                    eachSegment + CarbonCommonConstants.UNDERSCORE +
                    carbonLoadModel.getFactTimeStamp
                  } else {
                    null
                  }
                  findCarbonScanRDD(dataFrame.rdd, currentSegmentFileName)
                  // accumulator to collect segment metadata
                  val segmentMetaDataAccumulator = sc.sparkSession.sqlContext
                    .sparkContext
                    .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
                  // TODO: use new insert into flow, instead of DataFrame prepare RDD[InternalRow]
                  result = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
                    sc.sparkSession,
                    Some(dataFrame),
                    carbonLoadModel,
                    hadoopConf = configuration, segmentMetaDataAccumulator)
                }
                segmentToLoadStartTimeMap
                  .put(eachSegment, String.valueOf(carbonLoadModel.getFactTimeStamp))
                result
              }
            })
        }
        val segmentSecondaryIndexCreationStatus = futureObjectList.filter(_.get().length > 0)
          .groupBy(a => a.get().head._2._1.getSegmentStatus)
        val hasSuccessSegments =
          segmentSecondaryIndexCreationStatus.contains(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
          segmentSecondaryIndexCreationStatus.contains(SegmentStatus.SUCCESS)
        val hasFailedSegments = segmentSecondaryIndexCreationStatus
          .contains(SegmentStatus.MARKED_FOR_DELETE)
        if (hasSuccessSegments) {
          successSISegments =
            segmentSecondaryIndexCreationStatus(SegmentStatus.SUCCESS).collect {
              case segments: java.util.concurrent.Future[Array[(String, (LoadMetadataDetails,
                ExecutionErrors))]] =>
                segments.get().head._2._1.getLoadName
            }
        }
        if (hasFailedSegments) {
          // if the call is from compaction, we need to fail the main table compaction also, and if
          // the load is called from SIloadEventListener, which is for corresponding main table
          // segment, then if SI load fails, we need to fail main table load also, so throw
          // exception,
          // if load is called from SI creation or SILoadEventListenerForFailedSegments, no need to
          // fail, just make the segement as marked for delete, so that next load to main table will
          // take care
          if (isCompactionCall || !isLoadToFailedSISegments) {
            throw new Exception("Secondary index creation failed")
          } else {
            failedSISegments =
              segmentSecondaryIndexCreationStatus(SegmentStatus.MARKED_FOR_DELETE).collect {
                case segments: java.util.concurrent.Future[Array[(String, (LoadMetadataDetails,
                  ExecutionErrors))]] =>
                  segments.get().head._1
              }
          }
        }
      } else {
        var futureObjectList = List[java.util.concurrent.Future[Array[(String, Boolean)]]]()
        for (eachSegment <- validSegmentList) {
          val segId = eachSegment
          futureObjectList :+= executorService.submit(new Callable[Array[(String, Boolean)]] {
            @throws(classOf[Exception])
            override def call(): Array[(String, Boolean)] = {
              ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo
                .put("carbonConf", SparkSQLUtil.sessionState(sc.sparkSession).newHadoopConf())
              var eachSegmentSecondaryIndexCreationStatus: Array[(String, Boolean)] = Array.empty
              CarbonLoaderUtil.checkAndCreateCarbonDataLocation(segId, indexCarbonTable)
              carbonLoadModel
                .setFactTimeStamp(secondaryIndexModel.segmentIdToLoadStartTimeMapping(eachSegment))
              carbonLoadModel.setTablePath(secondaryIndexModel.carbonTable.getTablePath)
              val secondaryIndexCreationStatus = new CarbonSecondaryIndexRDD(sc.sparkSession,
                new SecondaryIndexCreationResultImpl,
                carbonLoadModel,
                secondaryIndexModel.secondaryIndex,
                segId, execInstance, indexCarbonTable, forceAccessSegment).collect()
              segmentToLoadStartTimeMap.put(segId, carbonLoadModel.getFactTimeStamp.toString)
              if (secondaryIndexCreationStatus.length > 0) {
                eachSegmentSecondaryIndexCreationStatus = secondaryIndexCreationStatus
              }
              eachSegmentSecondaryIndexCreationStatus
            }
          })
        }
        val segmentSecondaryIndexCreationStatus = futureObjectList.filter(_.get().length > 0)
          .groupBy(a => a.get().head._2)
        val hasSuccessSegments = segmentSecondaryIndexCreationStatus.contains("true".toBoolean)
        val hasFailedSegments = segmentSecondaryIndexCreationStatus.contains("false".toBoolean)
        if (hasSuccessSegments) {
          successSISegments =
            segmentSecondaryIndexCreationStatus("true".toBoolean).collect {
              case segments: java.util.concurrent.Future[Array[(String, Boolean)]] =>
                segments.get().head._1
            }
        }
        if (hasFailedSegments) {
          // if the call is from compaction, we need to fail the main table compaction also, and if
          // the load is called from SIloadEventListener, which is for corresponding main table
          // segment, then if SI load fails, we need to fail main table load also, so throw
          // exception,
          // if load is called from SI creation or SILoadEventListenerForFailedSegments, no need to
          // fail, just make the segement as marked for delete, so that next load to main table will
          // take care
          if (isCompactionCall || !isLoadToFailedSISegments) {
            throw new Exception("Secondary index creation failed")
          } else {
            failedSISegments =
              segmentSecondaryIndexCreationStatus("false".toBoolean).collect {
                case segments: java.util.concurrent.Future[Array[(String, Boolean)]] =>
                  segments.get().head._1
              }
          }
        }
      }
      // what and all segments the load failed, only for those need make status as marked
      // for delete, remaining let them be SUCCESS
      var tableStatusUpdateForSuccess = false
      var tableStatusUpdateForFailure = false

      if (successSISegments.nonEmpty && !isCompactionCall) {
        // merge index files for success segments in case of only load
        CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
          successSISegments,
          segmentToLoadStartTimeMap,
          indexCarbonTable.getTablePath,
          indexCarbonTable, mergeIndexProperty = false)

        val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
          indexCarbonTable.getMetadataPath, indexCarbonTable.getTableStatusVersion)
          .filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))

        val carbonLoadModelForMergeDataFiles = SecondaryIndexUtil
          .getCarbonLoadModel(indexCarbonTable,
            loadMetadataDetails.toList.asJava,
            System.currentTimeMillis(),
            CarbonIndexUtil
              .getCompressorForIndexTable(indexCarbonTable, secondaryIndexModel.carbonTable))
        carbonLoadModelForMergeDataFiles.setLatestTableStatusWriteVersion(secondaryIndexModel
          .carbonLoadModel.getLatestTableStatusWriteVersion)

        // merge the data files of the loaded segments and take care of
        // merging the index files inside this if needed
        val rebuiltSegments = SecondaryIndexUtil
          .mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping,
            indexCarbonTable,
            loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc)

        if (isInsertOverwrite) {
          val overriddenSegments = SegmentStatusManager.readLoadMetadata(
            indexCarbonTable.getMetadataPath, indexCarbonTable.getTableStatusVersion)
            .filter(loadMetadata => !successSISegments.contains(loadMetadata.getLoadName))
            .map(_.getLoadName).toList
          FileInternalUtil
            .updateTableStatus(
              overriddenSegments,
              secondaryIndexModel.carbonLoadModel,
              secondaryIndexModel.secondaryIndex.indexName,
              SegmentStatus.MARKED_FOR_DELETE,
              secondaryIndexModel.segmentIdToLoadStartTimeMapping,
              segmentToLoadStartTimeMap,
              indexTable,
              secondaryIndexModel.sqlContext.sparkSession)
        }
        if (rebuiltSegments.isEmpty) {
          for (loadMetadata <- loadMetadataDetails) {
            SegmentFileStore
              .writeSegmentFile(indexCarbonTable, loadMetadata.getLoadName,
                String.valueOf(loadMetadata.getLoadStartTime))
          }
          tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
            successSISegments,
            secondaryIndexModel.carbonLoadModel,
            secondaryIndexModel.secondaryIndex.indexName,
            SegmentStatus.SUCCESS,
            secondaryIndexModel.segmentIdToLoadStartTimeMapping,
            segmentToLoadStartTimeMap,
            indexCarbonTable,
            secondaryIndexModel.sqlContext.sparkSession,
            carbonLoadModelForMergeDataFiles.getFactTimeStamp,
            rebuiltSegments)
        }
      }

      if (!isCompactionCall) {
        // Index PrePriming for SI
        DistributedRDDUtils.triggerPrepriming(secondaryIndexModel.sqlContext.sparkSession,
          indexCarbonTable,
          Seq(),
          operationContext,
          FileFactory.getConfiguration,
          validSegments.asScala.toList)
      }

      // update the status of all the segments to marked for delete if data load fails, so that
      // next load which is triggered for SI table in post event of main table data load clears
      // all the segments of marked for delete and re-triggers the load to same segments again in
      // that event
      if (failedSISegments.nonEmpty && !isCompactionCall) {
        tableStatusUpdateForFailure = FileInternalUtil.updateTableStatus(
          failedSISegments,
          secondaryIndexModel.carbonLoadModel,
          secondaryIndexModel.secondaryIndex.indexName,
          SegmentStatus.MARKED_FOR_DELETE,
          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
          segmentToLoadStartTimeMap,
          indexCarbonTable,
          secondaryIndexModel.sqlContext.sparkSession)
      }

      if (failedSISegments.nonEmpty) {
        LOGGER.error("Dataload to secondary index creation has failed")
      }

      CarbonHiveIndexMetadataUtil.updateTableStatusVersion(indexCarbonTable,
        secondaryIndexModel.sqlContext.sparkSession,
        secondaryIndexModel.carbonLoadModel.getLatestTableStatusWriteVersion)

      if (!isCompactionCall) {
        val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
          LoadTableSIPostExecutionEvent(sc.sparkSession,
            indexCarbonTable.getCarbonTableIdentifier,
            secondaryIndexModel.carbonLoadModel,
            indexCarbonTable)
        OperationListenerBus.getInstance
          .fireEvent(loadTableSIPostExecutionEvent, operationContext)
      }

      if (isCompactionCall) {
        (indexCarbonTable, segmentLocks, operationContext)
      } else {
        (indexCarbonTable, ListBuffer.empty, operationContext)
      }
    } catch {
      case ex: Exception =>
        LOGGER.error("Load to SI table failed", ex)
        if (isCompactionCall) {
          segmentLocks.foreach(segmentLock => segmentLock.unlock())
        }
        FileInternalUtil
          .updateTableStatus(validSegmentList,
            secondaryIndexModel.carbonLoadModel,
            secondaryIndexModel.secondaryIndex.indexName,
            SegmentStatus.MARKED_FOR_DELETE,
            secondaryIndexModel.segmentIdToLoadStartTimeMapping,
            new java.util
            .HashMap[String,
              String](),
            indexCarbonTable,
            sc.sparkSession)
        throw ex
    } finally {
      // close the executor service
      if (null != executorService) {
        executorService.shutdownNow()
      }

      // release the segment locks only for load flow
      if (!isCompactionCall) {
        segmentLocks.foreach(segmentLock => {
          segmentLock.unlock()
        })
      }
    }
  }