def deleteDeltaExecutionInternal()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala [94:355]


  def deleteDeltaExecutionInternal(
      databaseNameOp: Option[String],
      tableName: String,
      sparkSession: SparkSession,
      dataRdd: RDD[Row],
      timestamp: String,
      isUpdateOperation: Boolean,
      executorErrors: ExecutionErrors,
      tupleId: Option[Int] = None):
  (Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]], BlockMappingVO) = {

    var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]] = null
    val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
    val tablePath = absoluteTableIdentifier.getTablePath

    val deleteRdd = if (isUpdateOperation) {
      val schema =
        org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
          org.apache.spark.sql.types.StringType)))
      val rdd = tupleId match {
        case Some(id) =>
          dataRdd
            .map(row => Row(row.get(id)))
        case _ =>
          dataRdd
            .map(row => Row(row.get(row.fieldIndex(
              CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
      }
      sparkSession.createDataFrame(rdd, schema).rdd
    } else {
      dataRdd
    }

    val (carbonInputFormat, job) = createCarbonInputFormat(absoluteTableIdentifier)
    CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
    val keyRdd = tupleId match {
      case Some(id) =>
        deleteRdd.map { row =>
          val tupleId: String = row.getString(id)
          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId,
            carbonTable.isHivePartitionTable)
          (key, row)
        }.groupByKey()
      case _ =>
        deleteRdd.map { row =>
          val tupleId: String = row
            .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId,
            carbonTable.isHivePartitionTable)
          (key, row)
        }.groupByKey()
    }

    // if no loads are present then no need to do anything.
    if (keyRdd.partitions.length == 0) {
      return (Array.empty[List[(SegmentStatus,
        (SegmentUpdateDetails, ExecutionErrors, Long))]], null)
    }
    val blockMappingVO =
      carbonInputFormat.getBlockRowCount(
        job,
        carbonTable,
        CarbonFilters.getPartitions(
          Seq.empty,
          sparkSession,
          TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull, true)
    val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable)
    CarbonUpdateUtil
      .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
    val metadataDetails = SegmentStatusManager.readTableStatusFile(
      CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath,
        carbonTable.getTableStatusVersion))
    val isStandardTable = CarbonUtil.isStandardCarbonTable(carbonTable)
    val rowContRdd =
      sparkSession.sparkContext.parallelize(
        blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
        keyRdd.partitions.length)

    val conf = SparkSQLUtil
      .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
    val rdd = rowContRdd.join(keyRdd)
    val blockDetails = blockMappingVO.getBlockToSegmentMapping
    res = rdd.mapPartitionsWithIndex(
      (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
        Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]] {
          ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
          var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]()
          while (records.hasNext) {
            val ((key), (rowCountDetailsVO, groupedRows)) = records.next
            result = result ++
                     deleteDeltaFunc(index,
                       key,
                       groupedRows.toIterator,
                       timestamp,
                       rowCountDetailsVO,
                       isStandardTable,
                       metadataDetails
                         .find(_.getLoadName.equalsIgnoreCase(blockDetails.get(key)))
                         .get, carbonTable)
          }
          result
        }).collect()

    def deleteDeltaFunc(index: Int,
        key: String,
        iter: Iterator[Row],
        timestamp: String,
        rowCountDetailsVO: RowCountDetailsVO,
        isStandardTable: Boolean,
        load: LoadMetadataDetails, carbonTable: CarbonTable
    ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))] = {

      val result = new DeleteDeltaResultImpl()
      var deleteStatus = SegmentStatus.LOAD_FAILURE
      lazy val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
      // here key = segment/blockName
      var blockName = if (carbonTable.isHivePartitionTable) {
        key
      } else {
        key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)
      }
      blockName = blockName.replace(CarbonCommonConstants.UNDERSCORE, CarbonTablePath.BATCH_PREFIX)
      blockName = CarbonUpdateUtil.getBlockName(CarbonTablePath.addDataPartPrefix(blockName))
      val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName)
      val resultIter =
        new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))] {
        val segmentUpdateDetails = new SegmentUpdateDetails()
        var TID = ""
        var countOfRows = 0
        try {
          while (iter.hasNext) {
            val oneRow = iter.next
            TID = oneRow
              .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
            val (offset, blockletId, pageId) = if (carbonTable.isHivePartitionTable) {
              (CarbonUpdateUtil.getRequiredFieldFromTID(TID,
                TupleIdEnum.OFFSET.getTupleIdIndex),
                CarbonUpdateUtil.getRequiredFieldFromTID(TID,
                  TupleIdEnum.BLOCKLET_ID.getTupleIdIndex),
                Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
                  TupleIdEnum.PAGE_ID.getTupleIdIndex)))
            } else if (TID.contains("#/") && load.getPath != null) {
              // this is in case of the external segment, where the tuple id has external path with#
              (CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.EXTERNAL_OFFSET),
                CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.EXTERNAL_BLOCKLET_ID),
                Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
                  TupleIdEnum.EXTERNAL_PAGE_ID)))
            } else {
              (CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET),
                CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID),
                Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
                  TupleIdEnum.PAGE_ID)))
            }
            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
            // stop delete operation
            if(!IsValidOffset) {
              executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING
              executorErrors.errorMsg = "Multiple input rows matched for same row."
              throw new MultipleMatchingException("Multiple input rows matched for same row.")
            }
            countOfRows = countOfRows + 1
          }

          val blockPath =
            if (StringUtils.isNotEmpty(load.getPath)) {
              load.getPath
            } else {
              CarbonUpdateUtil.getTableBlockPath(TID,
                tablePath,
                isStandardTable,
                carbonTable.isHivePartitionTable)
            }

          // get the compressor name
          var columnCompressor: String = carbonTable.getTableInfo
            .getFactTable
            .getTableProperties
            .get(CarbonCommonConstants.COMPRESSOR)
          if (null == columnCompressor) {
            columnCompressor = CompressorFactory.getInstance.getCompressor.getName
          }
          var blockNameFromTupleID =
            if (TID.contains("#/") && load.getPath != null) {
              CarbonUpdateUtil.getRequiredFieldFromTID(TID,
                TupleIdEnum.EXTERNAL_BLOCK_ID)
            } else {
              CarbonUpdateUtil.getRequiredFieldFromTID(TID,
                TupleIdEnum.BLOCK_ID)
            }
          blockNameFromTupleID = blockNameFromTupleID.replace(CarbonCommonConstants.UNDERSCORE,
            CarbonTablePath.BATCH_PREFIX)
          val completeBlockName = if (carbonTable.isHivePartitionTable) {
            CarbonTablePath
              .addDataPartPrefix(
                blockNameFromTupleID + CarbonCommonConstants.POINT + columnCompressor +
                CarbonCommonConstants.FACT_FILE_EXT)
          } else {
            CarbonTablePath
              .addDataPartPrefix(
                blockNameFromTupleID + CarbonCommonConstants.POINT + columnCompressor +
                CarbonCommonConstants.FACT_FILE_EXT)
          }
          val deleteDeltaPath = CarbonUpdateUtil
            .getDeleteDeltaFilePath(blockPath, blockName, timestamp)
          val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeltaPath)



          segmentUpdateDetails.setBlockName(blockName)
          segmentUpdateDetails.setActualBlockName(completeBlockName)
          segmentUpdateDetails.setSegmentName(load.getLoadName)
          segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
          segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)

          val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock
          val totalDeletedRows: Long = alreadyDeletedRows + countOfRows
          segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString)
          if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) {
            segmentUpdateDetails.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
          }
          else {
            // write the delta file
            carbonDeleteWriter.write(deleteDeltaBlockDetails)
          }

          deleteStatus = SegmentStatus.SUCCESS
        } catch {
          case e : MultipleMatchingException =>
            LOGGER.error(e.getMessage)
          // don't throw exception here.
          case e: Exception =>
            val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
            LOGGER.error(errorMsg + e.getMessage)
            throw e
        }


        var finished = false

        override def hasNext: Boolean = {
          if (!finished) {
            finished = true
            finished
          }
          else {
            !finished
          }
        }

        override def next(): (SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long)) = {
          finished = true
          result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors, countOfRows.toLong))
        }
      }
      resultIter
    }

    (res, blockMappingVO)
  }