private def writeMetaForSegment()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala [227:406]


  private def writeMetaForSegment(
      sparkSession: SparkSession,
      carbonTable: CarbonTable,
      segmentPath: String,
      partitionSpecOp: Option[CarbonPartitionSpec] = None,
      partitionDataFiles: Seq[FileStatus] = Seq.empty
  ): Unit = {
    val model = new CarbonLoadModel
    model.setCarbonTransactionalTable(true)
    model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
    model.setDatabaseName(carbonTable.getDatabaseName)
    model.setTableName(carbonTable.getTableName)
    val operationContext = new OperationContext
    operationContext.setProperty("isLoadOrCompaction", false)
    val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(sparkSession,
      model,
      "",
      segmentPath,
      options.asJava,
      options.asJava,
      false,
      false,
      None,
      operationContext)

    val newLoadMetaEntry = new LoadMetadataDetails
    model.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime)
    CarbonLoaderUtil.populateNewLoadMetaEntry(newLoadMetaEntry,
      SegmentStatus.INSERT_IN_PROGRESS,
      model.getFactTimeStamp,
      false)
    newLoadMetaEntry.setPath(segmentPath)
    val format = options.getOrElse("format", "carbondata")
    val isCarbonFormat = format.equalsIgnoreCase("carbondata") ||
                         format.equalsIgnoreCase("carbon")
    if (!isCarbonFormat) {
      newLoadMetaEntry.setFileFormat(new FileFormat(format))
    }
    val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter() {
      override def accept(file: CarbonFile): Boolean = file.getName
        .endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
    })
    val updateTimestamp = System.currentTimeMillis().toString
    var isUpdateStatusRequired = false
    if (deltaFiles.nonEmpty) {
      LOGGER.warn("Adding a modified load to the table. If there is any updated segment for this" +
        "load, please add updated segment also.")
      val blockNameToDeltaFilesMap =
        collection.mutable.Map[String, collection.mutable.ListBuffer[(CarbonFile, String)]]()
      deltaFiles.foreach { deltaFile =>
        val tmpDeltaFilePath = deltaFile.getAbsolutePath
          .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
            CarbonCommonConstants.FILE_SEPARATOR)
        val deltaFilePathElements = tmpDeltaFilePath.split(CarbonCommonConstants.FILE_SEPARATOR)
        if (deltaFilePathElements != null && deltaFilePathElements.nonEmpty) {
          val deltaFileName = deltaFilePathElements(deltaFilePathElements.length - 1)
          val blockName = CarbonTablePath.DataFileUtil
            .getBlockNameFromDeleteDeltaFile(deltaFileName)
          if (blockNameToDeltaFilesMap.contains(blockName)) {
            blockNameToDeltaFilesMap(blockName) += ((deltaFile, deltaFileName))
          } else {
            val deltaFileList = new ListBuffer[(CarbonFile, String)]()
            deltaFileList += ((deltaFile, deltaFileName))
            blockNameToDeltaFilesMap.put(blockName, deltaFileList)
          }
        }
      }
      val segmentUpdateDetails = new util.ArrayList[SegmentUpdateDetails]()
      val columnCompressor = CompressorFactory.getInstance.getCompressor.getName
      blockNameToDeltaFilesMap.foreach { entry =>
        val segmentUpdateDetail = new SegmentUpdateDetails()
        segmentUpdateDetail.setBlockName(entry._1)
        segmentUpdateDetail.setActualBlockName(
          entry._1 + CarbonCommonConstants.POINT + columnCompressor +
            CarbonCommonConstants.FACT_FILE_EXT)
        segmentUpdateDetail.setSegmentName(model.getSegmentId)
        val blockNameElements = entry._1.split(CarbonCommonConstants.HYPHEN)
        if (blockNameElements != null && blockNameElements.nonEmpty) {
          val segmentId = blockNameElements(blockNameElements.length - 1)
          // Segment ID in cases of SDK is null
          if (segmentId.equals("null")) {
            readAllDeltaFiles(entry._2, segmentUpdateDetail)
          } else {
            setValidDeltaFileAndDeletedRowCount(entry._2, segmentUpdateDetail)
          }
        }
        segmentUpdateDetails.add(segmentUpdateDetail)
      }
      CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails,
        carbonTable,
        updateTimestamp,
        false,
        true)
      isUpdateStatusRequired = true
      newLoadMetaEntry.setUpdateDeltaStartTimestamp(updateTimestamp)
      newLoadMetaEntry.setUpdateDeltaEndTimestamp(updateTimestamp)
    }

    CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, false, updateTimestamp,
      isUpdateStatusRequired)
    val segment = new Segment(
      model.getSegmentId,
      SegmentFileStore.genSegmentFileName(
        model.getSegmentId,
        System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
      segmentPath,
      new util.HashMap[String, String](options.asJava))
    // This event will trigger merge index job, only trigger it if it is carbon file
    if (isCarbonFormat) {
      CarbonLoaderUtil.mergeIndexFilesInAddLoadSegment(carbonTable,
        model.getSegmentId,
        segmentPath,
        model.getFactTimeStamp.toString)
      // clear Block index Cache
      SegmentFileStore.clearBlockIndexCache(carbonTable, model.getSegmentId)
    }
    val writeSegment =
      if (isCarbonFormat) {
        SegmentFileStore.writeSegmentFile(carbonTable, segment)
      } else {
        SegmentFileStore.writeSegmentFileForOthers(
          carbonTable, segment, partitionSpecOp.orNull, partitionDataFiles.asJava)
      }

    val success = if (writeSegment) {
      SegmentFileStore.updateTableStatusFile(
        carbonTable,
        model.getSegmentId,
        segment.getSegmentFileName,
        carbonTable.getCarbonTableIdentifier.getTableId,
        new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName),
        SegmentStatus.SUCCESS,
        model.getLatestTableStatusWriteVersion)
    } else {
      false
    }

    CarbonHiveIndexMetadataUtil.updateTableStatusVersion(carbonTable,
      sparkSession,
      model.getLatestTableStatusWriteVersion)

    val postExecutionEvent = if (success) {
      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
        new LoadTablePostStatusUpdateEvent(model)
      val commitComplete = try {
        OperationListenerBus.getInstance()
          .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
        true
      } catch {
        case ex: Exception =>
          LOGGER.error("Problem while committing indexes", ex)
          false
      }
      commitComplete
    } else {
      success
    }

    if (!postExecutionEvent || !success) {
      CarbonLoaderUtil.updateTableStatusForFailure(model, "uniqueTableStatusId")
      LOGGER.info("********starting clean up**********")
      // delete segment is applicable for transactional table
      CarbonLoaderUtil.deleteSegmentForFailure(model)
      // delete corresponding segment file from metadata
      val segmentFile = CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) +
                        File.separator + segment.getSegmentFileName
      FileFactory.deleteFile(segmentFile)
      clearIndexFiles(carbonTable, model.getSegmentId)
      LOGGER.info("********clean up done**********")
      LOGGER.error("Data load failed due to failure in table status update.")
      throw new Exception("Data load failed due to failure in table status update.")
    }
    MVManagerInSpark.disableMVOnTable(sparkSession, carbonTable)
    CommonLoadUtils.firePostLoadEvents(sparkSession,
      model,
      tableIndexes,
      indexOperationContext,
      carbonTable,
      operationContext)
  }