in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala [59:286]
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var updatedRowCount = 0L
IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
val res = plan find {
case relation: LogicalRelation if relation.relation
.isInstanceOf[CarbonDatasourceHadoopRelation] =>
true
case _ => false
}
if (res.isEmpty) {
return Array(Row(updatedRowCount)).toSeq
}
var carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
setAuditTable(carbonTable)
setAuditInfo(Map("plan" -> plan.prettyJson))
// Do not allow spatial index and its source columns to be updated.
AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable, columns)
columns.foreach { col =>
val dataType = carbonTable.getColumnByName(col).getColumnSchema.getDataType
if (dataType.isComplexType) {
throw new UnsupportedOperationException("Unsupported operation on Complex data type")
}
}
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "loading", "data update")
}
if (!carbonTable.canAllow(carbonTable, TableOperation.UPDATE)) {
throw new MalformedCarbonCommandException(
"update operation is not supported for index")
}
// Block the update operation for non carbon formats
if (MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath,
carbonTable.getTableStatusVersion)) {
throw new MalformedCarbonCommandException(
s"Unsupported update operation on table containing mixed format segments")
}
// trigger event for Update table
val operationContext = new OperationContext
val updateTablePreEvent: UpdateTablePreEvent =
UpdateTablePreEvent(sparkSession, carbonTable)
operationContext.setProperty("isLoadOrCompaction", false)
OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
val metadataLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.METADATA_LOCK)
val compactionLock = CarbonLockFactory.getCarbonLockObj(carbonTable
.getAbsoluteTableIdentifier, LockUsage.COMPACTION_LOCK)
val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.UPDATE_LOCK)
var lockStatus = false
// get the current time stamp which should be same for delete and update.
val currentTime = CarbonUpdateUtil.readCurrentTime
// var dataFrame: DataFrame = null
var dataSet: DataFrame = null
val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
var hasHorizontalCompactionException = false
var hasUpdateException = false
var fileTimestamp = ""
var updateTableModel: UpdateTableModel = null
try {
lockStatus = metadataLock.lockWithRetries()
if (lockStatus) {
logInfo("Successfully able to get the table metadata file lock")
}
else {
throw new Exception("Table is locked for update. Please try after some time")
}
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
if (updateLock.lockWithRetries()) {
if (compactionLock.lockWithRetries()) {
// Get RDD.
dataSet = if (isPersistEnabled) {
Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
}
else {
Dataset.ofRows(sparkSession, plan)
}
if (CarbonProperties.isUniqueValueCheckEnabled) {
// If more than one value present for the update key, should fail the update
val ds = dataSet.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
.groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
.count()
.select("count")
.filter(col("count") > lit(1))
.limit(1)
.collect()
// tupleId represents the source rows that are going to get replaced.
// If same tupleId appeared more than once means key has more than one value to replace.
// which is undefined behavior.
if (ds.length > 0 && ds(0).getLong(0) > 1) {
throw new UnsupportedOperationException(
" update cannot be supported for 1 to N mapping, as more than one value present " +
"for the update key")
}
}
// do delete operation.
val (segmentsToBeDeleted, updatedRowCountTemp, isUpdateRequired, tblStatusVersion) =
DeleteExecution.deleteDeltaExecution(
databaseNameOp,
tableName,
sparkSession,
dataSet.rdd,
currentTime + "",
isUpdateOperation = true,
executionErrors)
if (executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
}
updatedRowCount = updatedRowCountTemp
updateTableModel =
UpdateTableModel(true, currentTime, executionErrors, segmentsToBeDeleted, Option.empty)
// do update operation.
performUpdate(dataSet,
databaseNameOp,
tableName,
plan,
sparkSession,
updateTableModel,
executionErrors)
// pre-priming for update command
DeleteExecution.reloadDistributedSegmentCache(carbonTable,
segmentsToBeDeleted, operationContext)(sparkSession)
} else {
throw new ConcurrentOperationException(carbonTable, "compaction", "update")
}
} else {
throw new ConcurrentOperationException(carbonTable, "update/delete", "update")
}
if (executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
}
if (CarbonProperties.isTableStatusMultiVersionEnabled) {
carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
}
// Do IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(
sparkSession, carbonTable)
// Truncate materialized views on the current table.
val viewManager = MVManagerInSpark.get(sparkSession)
val viewSchemas = viewManager.getSchemasOnTable(carbonTable)
if (!viewSchemas.isEmpty) {
viewManager.onTruncate(viewSchemas)
}
// trigger event for Update table
val updateTablePostEvent: UpdateTablePostEvent =
UpdateTablePostEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(updateTablePostEvent, operationContext)
} catch {
case e: HorizontalCompactionException =>
LOGGER.error(
"Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
// In case of failure , clean all related delta files
fileTimestamp = e.compactionTimeStamp.toString
hasHorizontalCompactionException = true
case e: Exception =>
LOGGER.error("Exception in update operation", e)
fileTimestamp = currentTime + ""
hasUpdateException = true
if (null != e.getMessage) {
sys.error("Update operation failed. " + e.getMessage)
}
if (null != e.getCause && null != e.getCause.getMessage) {
sys.error("Update operation failed. " + e.getCause.getMessage)
}
sys.error("Update operation failed. please check logs.")
} finally {
// In case of failure, clean new inserted segment,
// change the status of new segment to 'mark for delete' from 'success'
if (hasUpdateException && null != updateTableModel
&& updateTableModel.insertedSegment.isDefined) {
CarbonLoaderUtil.updateTableStatusInCaseOfFailure(updateTableModel.insertedSegment.get,
carbonTable, SegmentStatus.SUCCESS)
}
if (updateLock.unlock()) {
LOGGER.info(s"updateLock unlocked successfully after update $tableName")
} else {
LOGGER.error(s"Unable to unlock updateLock for table $tableName after table update");
}
if (compactionLock.unlock()) {
LOGGER.info(s"compactionLock unlocked successfully after update $tableName")
} else {
LOGGER.error(s"Unable to unlock compactionLock for " +
s"table $tableName after update");
}
if (lockStatus) {
CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
}
if (null != dataSet && isPersistEnabled) {
try {
dataSet.unpersist()
} catch {
case e: Exception =>
LOGGER.error(s"Exception in update $tableName" + e.getMessage, e)
}
}
// In case of failure, clean all related delete delta files.
if (hasHorizontalCompactionException || hasUpdateException) {
// In case of failure , clean all related delete delta files
// When the table has too many segemnts, it will take a long time.
// So moving it to the end and it is outside of locking.
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp)
}
}
Seq(Row(updatedRowCount))
}