def loadCarbonData()

in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala [283:568]


  def loadCarbonData(
      sqlContext: SQLContext,
      carbonLoadModel: CarbonLoadModel,
      partitionStatus: SegmentStatus = SegmentStatus.SUCCESS,
      overwriteTable: Boolean,
      hadoopConf: Configuration,
      dataFrame: Option[DataFrame] = None,
      scanResultRdd : Option[RDD[InternalRow]] = None,
      updateModel: Option[UpdateTableModel] = None,
      operationContext: OperationContext): LoadMetadataDetails = {
    // Check if any load need to be deleted before loading new data
    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
    var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
    // accumulator to collect segment metadata
    val segmentMetaDataAccumulator = sqlContext
      .sparkContext
      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
    // create new segment folder  in carbon store
    if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable ||
        updateModel.isDefined) {
      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable)
    }
    var loadStatus = SegmentStatus.SUCCESS
    var errorMessage: String = "DataLoad failure"
    var executorMessage: String = ""
    val isSortTable = carbonTable.getNumberOfSortColumns > 0
    val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)

    val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
      CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)

    // dataFrame.get.rdd.isEmpty() will launch a job, so avoid calling it multiple times
    val isEmptyDataframe = updateModel.isDefined && dataFrame.get.rdd.isEmpty()
    try {
      if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) {
        if (isEmptyDataframe) {
          // if the rowToBeUpdated is empty, mark created segment as marked for delete and return
          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, "")
        } else {
          status = if (scanResultRdd.isDefined) {
            val colSchema = carbonLoadModel
              .getCarbonDataLoadSchema
              .getCarbonTable
              .getTableInfo
              .getFactTable
              .getListOfColumns
              .asScala
              .filterNot(col => col.isInvisible || col.isComplexColumn)
            val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
              colSchema,
              scanResultRdd.get,
              isGlobalSortPartition = false)
            if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) &&
                !carbonLoadModel.isNonSchemaColumnsPresent) {
              DataLoadProcessBuilderOnSpark.insertDataUsingGlobalSortWithInternalRow(sqlContext
                .sparkSession,
                convertedRdd,
                carbonLoadModel,
                hadoopConf,
                segmentMetaDataAccumulator)
            } else if (sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
              loadDataFrameForNoSort(sqlContext,
                None,
                Some(convertedRdd),
                carbonLoadModel,
                segmentMetaDataAccumulator)
            } else {
              loadDataFrame(sqlContext,
                None,
                Some(convertedRdd),
                carbonLoadModel,
                segmentMetaDataAccumulator)
            }
          } else {
            if (dataFrame.isEmpty && isSortTable &&
                carbonLoadModel.getRangePartitionColumn != null &&
                (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) ||
                 sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) {
              DataLoadProcessBuilderOnSpark
                .loadDataUsingRangeSort(sqlContext.sparkSession,
                  carbonLoadModel,
                  hadoopConf,
                  segmentMetaDataAccumulator)
            } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
              DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
                dataFrame,
                carbonLoadModel,
                hadoopConf,
                segmentMetaDataAccumulator)
            } else if (dataFrame.isDefined) {
              loadDataFrame(sqlContext,
                dataFrame,
                None,
                carbonLoadModel,
                segmentMetaDataAccumulator)
            } else {
              loadDataFile(sqlContext, carbonLoadModel, hadoopConf, segmentMetaDataAccumulator)
            }
          }
          val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
          if (status.nonEmpty) {
            status.foreach { eachLoadStatus =>
              val state = newStatusMap.get(eachLoadStatus._1)
              state match {
                case Some(SegmentStatus.LOAD_FAILURE) =>
                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
                case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
                  if eachLoadStatus._2._1.getSegmentStatus ==
                     SegmentStatus.SUCCESS =>
                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
                case _ =>
                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
              }
            }

            newStatusMap.foreach {
              case (key, value) =>
                if (value == SegmentStatus.LOAD_FAILURE) {
                  loadStatus = SegmentStatus.LOAD_FAILURE
                } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
                           loadStatus != SegmentStatus.LOAD_FAILURE) {
                  loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
                }
            }
          } else {
            // if no value is there in data load, make load status Success
            // and data load flow executes
            if ((dataFrame.isDefined || scanResultRdd.isDefined) && updateModel.isEmpty) {
              if (dataFrame.isDefined) {
                val rdd = dataFrame.get.rdd
                if (rdd.partitions == null || rdd.partitions.length == 0) {
                  LOGGER.warn("DataLoading finished. No data was loaded.")
                  loadStatus = SegmentStatus.SUCCESS
                }
              } else {
                if (scanResultRdd.get.partitions == null ||
                    scanResultRdd.get.partitions.length == 0) {
                  LOGGER.warn("DataLoading finished. No data was loaded.")
                  loadStatus = SegmentStatus.SUCCESS
                }
              }
            } else {
              loadStatus = SegmentStatus.LOAD_FAILURE
            }
          }

          if (loadStatus != SegmentStatus.LOAD_FAILURE &&
              partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
            loadStatus = partitionStatus
          }
        }
      }
    } catch {
      case ex: Throwable =>
        loadStatus = SegmentStatus.LOAD_FAILURE
        val (extrMsgLocal, errorMsgLocal) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER)
        executorMessage = extrMsgLocal
        errorMessage = errorMsgLocal
        LOGGER.info(errorMessage)
        LOGGER.error(ex)
    }
    var isLoadingCommitted = false
    try {
      val uniqueTableStatusId = Option(operationContext.getProperty("uuid")).getOrElse("")
        .asInstanceOf[String]
      if (loadStatus == SegmentStatus.LOAD_FAILURE) {
        // update the load entry in table status file for changing the status to marked for delete
        CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
        LOGGER.info("********starting clean up**********")
        if (carbonLoadModel.isCarbonTransactionalTable) {
          // delete segment is applicable for transactional table
          CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
          clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
        }
        LOGGER.info("********clean up done**********")
        LOGGER.warn("Cannot write load metadata file as data load failed")
        throw new Exception(errorMessage)
      } else {
        // check if data load fails due to bad record and throw data load failure due to
        // bad record exception
        if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
            status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
            carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
          // update the load entry in table status file for changing the status to marked for delete
          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
          LOGGER.info("********starting clean up**********")
          if (carbonLoadModel.isCarbonTransactionalTable) {
            // delete segment is applicable for transactional table
            CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
            clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
          }
          LOGGER.info("********clean up done**********")
          throw new Exception(status(0)._2._2.errorMsg)
        }
        if (isEmptyDataframe) {
          return null
        }
        // as no record loaded in new segment, new segment should be deleted
        val newEntryLoadStatus =
          if (carbonLoadModel.isCarbonTransactionalTable &&
              !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV &&
              !CarbonLoaderUtil.isValidSegment(carbonLoadModel,
                carbonLoadModel.getSegmentId.toInt)) {
            LOGGER.warn("Cannot write load metadata file as there is no data to load")
            SegmentStatus.MARKED_FOR_DELETE
          } else {
            loadStatus
          }

        val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
          carbonLoadModel.getSegmentId,
          segmentMetaDataAccumulator)
        segmentMetaDataAccumulator.reset()

        operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
          carbonLoadModel.getSegmentId)
        val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
          new LoadTablePreStatusUpdateEvent(
            carbonTable.getCarbonTableIdentifier,
            carbonLoadModel)
        OperationListenerBus.getInstance()
          .fireEvent(loadTablePreStatusUpdateEvent, operationContext)
        val segmentFileName =
          SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
            String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
        val (done, writtenSegment) =
          updateTableStatus(
            sqlContext.sparkSession,
            status,
            carbonLoadModel,
            newEntryLoadStatus,
            overwriteTable,
            segmentFileName,
            updateModel,
            uniqueTableStatusId)
        val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
          new LoadTablePostStatusUpdateEvent(carbonLoadModel)
        val commitComplete = try {
          OperationListenerBus.getInstance()
            .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
          true
        } catch {
          case ex: Exception =>
            LOGGER.error("Problem while committing indexes", ex)
            false
        }
        if (!done || !commitComplete) {
          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
          LOGGER.info("********starting clean up**********")
          if (carbonLoadModel.isCarbonTransactionalTable) {
            // delete segment is applicable for transactional table
            CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
            // delete corresponding segment file from metadata
            val segmentFile =
              CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
              File.separator + segmentFileName
            FileFactory.deleteFile(segmentFile)
            clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
          }
          LOGGER.info("********clean up done**********")
          LOGGER.error("Data load failed due to failure in table status update.")
          throw new Exception("Data load failed due to failure in table status update.")
        }
        if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
          LOGGER.info("Data load is partially successful for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
        } else {
          LOGGER.info("Data load is successful for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
        }
        isLoadingCommitted = true
        writtenSegment
      }
    } finally {
      // Release the segment lock, once table status is finally updated
      segmentLock.unlock()
      if (isLoadingCommitted) {
        triggerEventsAfterLoading(sqlContext,
          carbonLoadModel,
          hadoopConf,
          operationContext,
          updateModel.isDefined)
      }
    }
  }