in backend/transaction/read_write_transaction.cc [469:574]
absl::Status ReadWriteTransaction::Write(const Mutation& mutation) {
return GuardedCall(OpType::kWrite, [&]() -> absl::Status {
mu_.AssertHeld();
ForeignKeyRestrictions fk_restrictions;
// When writing, comparison may be required in the process. Comparison of
// PG.NUMERIC calls PG to get comparison result (see function
// ValueContentLess in datatypes/extended/pg_numeric_type.cc) and therefore
// requires a PG arena.
ZETASQL_VLOG(1) << "Creating memory context and Processing Write mutations";
ZETASQL_ASSIGN_OR_RETURN(
std::unique_ptr<postgres_translator::interfaces::PGArena> arena,
postgres_translator::spangres::MemoryContextPGArena::Init(nullptr));
for (const MutationOp& mutation_op : mutation.ops()) {
ZETASQL_ASSIGN_OR_RETURN(
bool has_delete_cascade_foreign_key,
IsMutationInvolvingForeignKeyAction(mutation_op, schema_));
if (mutation_op.type == MutationOpType::kDelete) {
// Process Delete.
ZETASQL_ASSIGN_OR_RETURN(
ResolvedMutationOp resolved_mutation_op,
ResolveDeleteMutationOp(mutation_op, schema_, clock_->Now()));
const std::string& table_name = resolved_mutation_op.table->Name();
if (has_delete_cascade_foreign_key) {
ZETASQL_RETURN_IF_ERROR(fk_restrictions.ValidateReferencedDeleteMods(
table_name, resolved_mutation_op.key_ranges));
}
std::vector<KeyRange>& key_ranges =
deleted_key_ranges_by_table_[table_name];
key_ranges.insert(key_ranges.end(),
resolved_mutation_op.key_ranges.begin(),
resolved_mutation_op.key_ranges.end());
ZETASQL_ASSIGN_OR_RETURN(std::vector<WriteOp> write_ops,
FlattenDeleteOp(resolved_mutation_op.table,
resolved_mutation_op.key_ranges,
transaction_store_.get()));
ZETASQL_RETURN_IF_ERROR(ProcessWriteOps(write_ops));
} else {
// Process non-delete Mutation ops.
ZETASQL_RETURN_IF_ERROR(ValidateNonDeleteMutationOp(mutation_op, schema_));
ZETASQL_ASSIGN_OR_RETURN(ResolvedMutationOp resolved_mutation_op,
ResolveNonDeleteMutationOp(mutation_op, schema_));
const std::string& table_name = resolved_mutation_op.table->Name();
// Process Insert, Update, Replace and InsertOrUpdate.
for (int i = 0; i < resolved_mutation_op.rows.size(); i++) {
// Spanner allows deleted entries to be reinserted within the same
// transaction, so we must update the deleted ranges list in this
// case.
if (resolved_mutation_op.type == MutationOpType::kInsert ||
resolved_mutation_op.type == MutationOpType::kInsertOrUpdate) {
std::vector<KeyRange> split_key_ranges;
auto& deleted_key_ranges = deleted_key_ranges_by_table_[table_name];
for (auto it = deleted_key_ranges.begin();
it != deleted_key_ranges.end();) {
if (SplitKeyRangeAndAppend(*it, resolved_mutation_op.keys[i],
&split_key_ranges)) {
it = deleted_key_ranges.erase(it);
} else {
++it;
}
}
// Insert split key ranges back into the list of deleted key ranges.
for (const KeyRange& key_range : split_key_ranges) {
deleted_key_ranges.push_back(key_range);
}
}
if (resolved_mutation_op.type == MutationOpType::kUpdate) {
for (const KeyRange& key_range :
deleted_key_ranges_by_table_[table_name]) {
if (key_range.Contains(resolved_mutation_op.keys[i])) {
return error::UpdateDeletedRowInTransaction(
table_name, resolved_mutation_op.keys[i].DebugString());
}
}
}
ZETASQL_ASSIGN_OR_RETURN(
std::vector<WriteOp> write_ops,
FlattenNonDeleteOpRow(
resolved_mutation_op.type, resolved_mutation_op.table,
resolved_mutation_op.columns, resolved_mutation_op.keys[i],
resolved_mutation_op.rows[i], transaction_store_.get()));
if (has_delete_cascade_foreign_key) {
ZETASQL_RETURN_IF_ERROR(fk_restrictions.ValidateReferencedMods(
write_ops, table_name, schema_));
}
ZETASQL_RETURN_IF_ERROR(ProcessWriteOps(write_ops));
}
}
}
ZETASQL_RETURN_IF_ERROR(ApplyStatementVerifiers());
// We defer all commit timestamp tracking to the end of the write to avoid
// effector reads being rejected because of a previously written op in the
// same call to ReadWriteTransaction::Write (e.g. updates to two rows in the
// same table with an indexed commit timestamp column can be rejected due
// to effector reads if the updates are split into separate Write calls, but
// should succeed if written together).
UpdateTrackedCommitTimestamps();
return absl::OkStatus();
});
}