override def processData()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala [59:286]


  override def processData(sparkSession: SparkSession): Seq[Row] = {
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    var updatedRowCount = 0L
    IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
    val res = plan find {
      case relation: LogicalRelation if relation.relation
        .isInstanceOf[CarbonDatasourceHadoopRelation] =>
        true
      case _ => false
    }

    if (res.isEmpty) {
      return Array(Row(updatedRowCount)).toSeq
    }
    var carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
    setAuditTable(carbonTable)
    setAuditInfo(Map("plan" -> plan.prettyJson))
    // Do not allow spatial index and its source columns to be updated.
    AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable, columns)
    columns.foreach { col =>
      val dataType = carbonTable.getColumnByName(col).getColumnSchema.getDataType
      if (dataType.isComplexType) {
        throw new UnsupportedOperationException("Unsupported operation on Complex data type")
      }

    }
    if (!carbonTable.getTableInfo.isTransactionalTable) {
      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
    }
    if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
      throw new ConcurrentOperationException(carbonTable, "loading", "data update")
    }

    if (!carbonTable.canAllow(carbonTable, TableOperation.UPDATE)) {
      throw new MalformedCarbonCommandException(
        "update operation is not supported for index")
    }

    // Block the update operation for non carbon formats
    if (MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath,
      carbonTable.getTableStatusVersion)) {
      throw new MalformedCarbonCommandException(
        s"Unsupported update operation on table containing mixed format segments")
    }

    // trigger event for Update table
    val operationContext = new OperationContext
    val updateTablePreEvent: UpdateTablePreEvent =
      UpdateTablePreEvent(sparkSession, carbonTable)
    operationContext.setProperty("isLoadOrCompaction", false)
    OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
    val metadataLock = CarbonLockFactory
      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
        LockUsage.METADATA_LOCK)
    val compactionLock = CarbonLockFactory.getCarbonLockObj(carbonTable
      .getAbsoluteTableIdentifier, LockUsage.COMPACTION_LOCK)
    val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
      LockUsage.UPDATE_LOCK)
    var lockStatus = false
    // get the current time stamp which should be same for delete and update.
    val currentTime = CarbonUpdateUtil.readCurrentTime
    //    var dataFrame: DataFrame = null
    var dataSet: DataFrame = null
    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
    var hasHorizontalCompactionException = false
    var hasUpdateException = false
    var fileTimestamp = ""
    var updateTableModel: UpdateTableModel = null
    try {
      lockStatus = metadataLock.lockWithRetries()
      if (lockStatus) {
        logInfo("Successfully able to get the table metadata file lock")
      }
      else {
        throw new Exception("Table is locked for update. Please try after some time")
      }

      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
      if (updateLock.lockWithRetries()) {
        if (compactionLock.lockWithRetries()) {
          // Get RDD.
          dataSet = if (isPersistEnabled) {
            Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
              CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
          }
          else {
            Dataset.ofRows(sparkSession, plan)
          }
          if (CarbonProperties.isUniqueValueCheckEnabled) {
            // If more than one value present for the update key, should fail the update
            val ds = dataSet.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
              .groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
              .count()
              .select("count")
              .filter(col("count") > lit(1))
              .limit(1)
              .collect()
            // tupleId represents the source rows that are going to get replaced.
            // If same tupleId appeared more than once means key has more than one value to replace.
            // which is undefined behavior.
            if (ds.length > 0 && ds(0).getLong(0) > 1) {
              throw new UnsupportedOperationException(
                " update cannot be supported for 1 to N mapping, as more than one value present " +
                "for the update key")
            }
          }

          // do delete operation.
          val (segmentsToBeDeleted, updatedRowCountTemp, isUpdateRequired, tblStatusVersion) =
            DeleteExecution.deleteDeltaExecution(
              databaseNameOp,
              tableName,
              sparkSession,
              dataSet.rdd,
              currentTime + "",
              isUpdateOperation = true,
              executionErrors)

          if (executionErrors.failureCauses != FailureCauses.NONE) {
            throw new Exception(executionErrors.errorMsg)
          }

          updatedRowCount = updatedRowCountTemp
          updateTableModel =
            UpdateTableModel(true, currentTime, executionErrors, segmentsToBeDeleted, Option.empty)
          // do update operation.
          performUpdate(dataSet,
            databaseNameOp,
            tableName,
            plan,
            sparkSession,
            updateTableModel,
            executionErrors)

          // pre-priming for update command
          DeleteExecution.reloadDistributedSegmentCache(carbonTable,
            segmentsToBeDeleted, operationContext)(sparkSession)

        } else {
          throw new ConcurrentOperationException(carbonTable, "compaction", "update")
        }
      } else {
        throw new ConcurrentOperationException(carbonTable, "update/delete", "update")
      }
      if (executionErrors.failureCauses != FailureCauses.NONE) {
        throw new Exception(executionErrors.errorMsg)
      }
      if (CarbonProperties.isTableStatusMultiVersionEnabled) {
        carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
      }
      // Do IUD Compaction.
      HorizontalCompaction.tryHorizontalCompaction(
        sparkSession, carbonTable)

      // Truncate materialized views on the current table.
      val viewManager = MVManagerInSpark.get(sparkSession)
      val viewSchemas = viewManager.getSchemasOnTable(carbonTable)
      if (!viewSchemas.isEmpty) {
        viewManager.onTruncate(viewSchemas)
      }

      // trigger event for Update table
      val updateTablePostEvent: UpdateTablePostEvent =
        UpdateTablePostEvent(sparkSession, carbonTable)
      OperationListenerBus.getInstance.fireEvent(updateTablePostEvent, operationContext)
    } catch {
      case e: HorizontalCompactionException =>
        LOGGER.error(
          "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
        // In case of failure , clean all related delta files
        fileTimestamp = e.compactionTimeStamp.toString
        hasHorizontalCompactionException = true
      case e: Exception =>
        LOGGER.error("Exception in update operation", e)
        fileTimestamp = currentTime + ""
        hasUpdateException = true
        if (null != e.getMessage) {
          sys.error("Update operation failed. " + e.getMessage)
        }
        if (null != e.getCause && null != e.getCause.getMessage) {
          sys.error("Update operation failed. " + e.getCause.getMessage)
        }
        sys.error("Update operation failed. please check logs.")
    } finally {
      // In case of failure, clean new inserted segment,
      // change the status of new segment to 'mark for delete' from 'success'
      if (hasUpdateException && null != updateTableModel
        && updateTableModel.insertedSegment.isDefined) {
        CarbonLoaderUtil.updateTableStatusInCaseOfFailure(updateTableModel.insertedSegment.get,
          carbonTable, SegmentStatus.SUCCESS)
      }

      if (updateLock.unlock()) {
        LOGGER.info(s"updateLock unlocked successfully after update $tableName")
      } else {
        LOGGER.error(s"Unable to unlock updateLock for table $tableName after table update");
      }

      if (compactionLock.unlock()) {
        LOGGER.info(s"compactionLock unlocked successfully after update $tableName")
      } else {
        LOGGER.error(s"Unable to unlock compactionLock for " +
          s"table $tableName after update");
      }

      if (lockStatus) {
        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
      }

      if (null != dataSet && isPersistEnabled) {
        try {
          dataSet.unpersist()
        } catch {
          case e: Exception =>
            LOGGER.error(s"Exception in update $tableName" + e.getMessage, e)
        }
      }

      // In case of failure, clean all related delete delta files.
      if (hasHorizontalCompactionException || hasUpdateException) {
        // In case of failure , clean all related delete delta files
        // When the table has too many segemnts, it will take a long time.
        // So moving it to the end and it is outside of locking.
        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp)
      }
    }
    Seq(Row(updatedRowCount))
  }