def main()

in integration/spark/src/main/scala/org/apache/carbondata/recovery/tablestatus/TableStatusRecovery.scala [40:206]


  def main(args: Array[String]): Unit = {
    // check the argument contains database name and tablename to recover table status file
    assert(args.length == 2)
    createCarbonSession()
    val sparkSession = SparkSQLUtil.getSparkSession
    val tableName = args(1)
    val databaseName = args(0)
    // get carbon table to start table status recovery
    val carbonTable = try {
      CarbonEnv.getCarbonTable(Some(databaseName), tableName)(sparkSession)
    } catch {
      case ex: Exception =>
        throw ex
    }

    if (carbonTable.isMV) {
      // not supported
      throw new UnsupportedOperationException("Unsupported operation on Materialized view table")
    }

    /**
     * 1. get the current table status version file name associated with carbon table
     * 2. Check if the current table status version file exists
     * 3. If does not exists, then read all the old table status version files and find the last
     *    recent version file and get the load metadata details. For the lost load metadata,
     *    read the segment files and table status update files to recover the lost
     *    load metadata entry and add it to previous version load metadata details list.
     * 4. Write the load metadata details list with version name as [Step:1]
     * */
    val tableStatusVersion = carbonTable.getTableStatusVersion
    val tableStatusPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath,
      tableStatusVersion)
    val tableStatusFile = FileFactory.getCarbonFile(
      FileFactory.getUpdatedFilePath(tableStatusPath))
    if (!tableStatusFile.exists()) {
      // case where the current version table status file is lost, then get the previous table
      // status version file and update it as the current table status version
      val tableStatusFiles = CarbonScalaUtil.getTableStatusVersionFiles(carbonTable.getTablePath)
      // read the segment files in the Metadata directory
      val segmentFileDir = FileFactory.getCarbonFile(FileFactory.getUpdatedFilePath(
        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)))
      val segmentFiles = segmentFileDir.listFiles()
        .filter(segmentFile => segmentFile.getName.endsWith(CarbonTablePath.SEGMENT_EXT))
        .toList
      if (tableStatusFiles.isEmpty) {
        if (segmentFiles.isEmpty) {
          // no metadata found to recover table status file
          throw new Exception(
            "Segment Files does not exists to recover load metadata")
        }
      }
      // prepare segment to latest timestamp version map. This is required, in case of drop
      // partition, where there can be multiple segment files for same segment Id
      val segToTimeStampMap = new util.HashMap[String, String]()
      segmentFiles.foreach { segmentFile =>
        val segFileName = segmentFile.getName
        val segmentToTimestamp = segFileName.trim.split(CarbonCommonConstants.UNDERSCORE).toList
        if (!segToTimeStampMap.containsKey(segmentToTimestamp.head)) {
          segToTimeStampMap.put(segmentToTimestamp.head, segmentToTimestamp.last)
        } else {
          val timeStamp = segToTimeStampMap.get(segmentToTimestamp.head)
          if (timeStamp <= segmentToTimestamp.last) {
            segToTimeStampMap.put(segmentToTimestamp.head, segmentToTimestamp.last)
          }
        }
      }
      // iterate the available table status version files and find the most recent table status
      // version file
      val latestTableStatusVersionStr = CarbonScalaUtil.getLatestTblStatusVersionBasedOnTimestamp(
        tableStatusFiles)

      // read the load metadata details with the identified table status version file
      var loadMetaDetails = SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(
        carbonTable.getTablePath), latestTableStatusVersionStr).toList

      var updateMetaDetails: Array[SegmentUpdateDetails] = Array.empty

      val tableUpdateStatusFiles = FileFactory.getCarbonFile(CarbonTablePath.getMetadataPath(
        carbonTable.getTablePath)).listFiles(new CarbonFileFilter {
        override def accept(file: CarbonFile): Boolean = {
          file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
        }
      })

      // if table has table update status files, iterate and identify the latest table status
      // update file
      if (tableUpdateStatusFiles.nonEmpty) {
        var latestTableUpdateStatusVersion = 0L
        tableUpdateStatusFiles.foreach { tableStatusFile =>
          val updateVersionTimeStamp = tableStatusFile.getName
            .substring(tableStatusFile.getName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
              tableStatusFile.getName.length).toLong
          if (latestTableUpdateStatusVersion <= updateVersionTimeStamp) {
            latestTableUpdateStatusVersion = updateVersionTimeStamp
          }
        }
        updateMetaDetails = SegmentUpdateStatusManager.readLoadMetadata(
          CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME + CarbonCommonConstants.HYPHEN +
          latestTableUpdateStatusVersion.toString, carbonTable.getTablePath)
      }

      // check which segment is missing from lost table status version
      val missedLoadMetaDetails: util.List[LoadMetadataDetails] =
        new util.ArrayList[LoadMetadataDetails]()
      segToTimeStampMap.asScala.foreach { segmentFileEntry =>
        val segmentFileName = segmentFileEntry._1 + CarbonCommonConstants.UNDERSCORE +
                              segmentFileEntry._2
        val segmentId = segmentFileEntry._1
        val segmentUpdateDetail = updateMetaDetails
          .filter(_.getSegmentName.equalsIgnoreCase(segmentId))
        // check if the segment Id from segment file entry exists in load metadata details list.
        // If does not exist, or if the segment file mapped to the load metadata entry and the
        // latest segment file timestamp is not same, then prepare new load metadata.
        if ((!loadMetaDetails.exists(_.getLoadName.equalsIgnoreCase(segmentId))
             || !loadMetaDetails.filter(_.getLoadName.equalsIgnoreCase(segmentId))
          .head.getSegmentFile.equalsIgnoreCase(segmentFileName)) &&
            !segmentId.contains(CarbonCommonConstants.POINT)) {
          val segFilePath = CarbonTablePath.getSegmentFilePath(
            carbonTable.getTablePath, segmentFileName)
          // read segment file and prepare load metadata
          val segmentFile = SegmentFileStore.readSegmentFile(segFilePath)
          val loadMetadataDetail = new LoadMetadataDetails()
          val segmentInfo = segmentFile.getLocationMap.asScala.head._2
          if (!segmentUpdateDetail.isEmpty) {
            loadMetadataDetail.setSegmentStatus(segmentUpdateDetail.head.getSegmentStatus)
            loadMetadataDetail.setModificationOrDeletionTimestamp(segmentUpdateDetail.head
              .getDeleteDeltaStartTimeAsLong)
          } else {
            loadMetadataDetail.setSegmentStatus(getSegmentStatus(segmentInfo.getStatus))
          }
          loadMetadataDetail.setLoadName(segmentId)
          loadMetadataDetail.setSegmentFile(segmentFileName)
          val dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable
            .getTablePath, new Segment(segmentId, segmentFileName))
          loadMetadataDetail.setDataSize(dataIndexSize
            .get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString)
          loadMetadataDetail.setIndexSize(dataIndexSize
            .get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString)
          loadMetadataDetail.setLoadEndTime(FileFactory
            .getCarbonFile(segFilePath)
            .getLastModifiedTime)
          missedLoadMetaDetails.add(loadMetadataDetail)
          if (loadMetaDetails.exists(_.getLoadName.equalsIgnoreCase(segmentId))) {
            loadMetaDetails = loadMetaDetails.filterNot(_.getLoadName
              .equalsIgnoreCase(segmentId))
          }
        } else if (!segmentUpdateDetail.isEmpty) {
          // in case of Update/delete, update the already existing load metadata entry with the
          // latest segment update detail
          val loadMetadataDetail = loadMetaDetails
            .find(_.getLoadName.equalsIgnoreCase(segmentId))
            .head
          loadMetadataDetail.setSegmentStatus(segmentUpdateDetail.head.getSegmentStatus)
          loadMetadataDetail.setModificationOrDeletionTimestamp(segmentUpdateDetail.head
            .getDeleteDeltaStartTimeAsLong)
          loadMetaDetails = loadMetaDetails.filterNot(_.getLoadName.equalsIgnoreCase(segmentId))
          missedLoadMetaDetails.add(loadMetadataDetail)
        }
      }
      missedLoadMetaDetails.addAll(loadMetaDetails.asJava)
      // write new table status file with lost table status version name
      SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
        carbonTable.getTablePath, tableStatusVersion),
        missedLoadMetaDetails.toArray(new Array[LoadMetadataDetails](missedLoadMetaDetails
          .size)))
    }
  }