absl::Status ReadWriteTransaction::Write()

in backend/transaction/read_write_transaction.cc [469:574]


absl::Status ReadWriteTransaction::Write(const Mutation& mutation) {
  return GuardedCall(OpType::kWrite, [&]() -> absl::Status {
    mu_.AssertHeld();
    ForeignKeyRestrictions fk_restrictions;

    // When writing, comparison may be required in the process. Comparison of
    // PG.NUMERIC calls PG to get comparison result (see function
    // ValueContentLess in datatypes/extended/pg_numeric_type.cc) and therefore
    // requires a PG arena.
    ZETASQL_VLOG(1) << "Creating memory context and Processing Write mutations";
    ZETASQL_ASSIGN_OR_RETURN(
        std::unique_ptr<postgres_translator::interfaces::PGArena> arena,
        postgres_translator::spangres::MemoryContextPGArena::Init(nullptr));

    for (const MutationOp& mutation_op : mutation.ops()) {
      ZETASQL_ASSIGN_OR_RETURN(
          bool has_delete_cascade_foreign_key,
          IsMutationInvolvingForeignKeyAction(mutation_op, schema_));
      if (mutation_op.type == MutationOpType::kDelete) {
        // Process Delete.
        ZETASQL_ASSIGN_OR_RETURN(
            ResolvedMutationOp resolved_mutation_op,
            ResolveDeleteMutationOp(mutation_op, schema_, clock_->Now()));
        const std::string& table_name = resolved_mutation_op.table->Name();

        if (has_delete_cascade_foreign_key) {
          ZETASQL_RETURN_IF_ERROR(fk_restrictions.ValidateReferencedDeleteMods(
              table_name, resolved_mutation_op.key_ranges));
        }

        std::vector<KeyRange>& key_ranges =
            deleted_key_ranges_by_table_[table_name];
        key_ranges.insert(key_ranges.end(),
                          resolved_mutation_op.key_ranges.begin(),
                          resolved_mutation_op.key_ranges.end());
        ZETASQL_ASSIGN_OR_RETURN(std::vector<WriteOp> write_ops,
                         FlattenDeleteOp(resolved_mutation_op.table,
                                         resolved_mutation_op.key_ranges,
                                         transaction_store_.get()));
        ZETASQL_RETURN_IF_ERROR(ProcessWriteOps(write_ops));
      } else {
        // Process non-delete Mutation ops.
        ZETASQL_RETURN_IF_ERROR(ValidateNonDeleteMutationOp(mutation_op, schema_));
        ZETASQL_ASSIGN_OR_RETURN(ResolvedMutationOp resolved_mutation_op,
                         ResolveNonDeleteMutationOp(mutation_op, schema_));
        const std::string& table_name = resolved_mutation_op.table->Name();

        // Process Insert, Update, Replace and InsertOrUpdate.
        for (int i = 0; i < resolved_mutation_op.rows.size(); i++) {
          // Spanner allows deleted entries to be reinserted within the same
          // transaction, so we must update the deleted ranges list in this
          // case.
          if (resolved_mutation_op.type == MutationOpType::kInsert ||
              resolved_mutation_op.type == MutationOpType::kInsertOrUpdate) {
            std::vector<KeyRange> split_key_ranges;
            auto& deleted_key_ranges = deleted_key_ranges_by_table_[table_name];
            for (auto it = deleted_key_ranges.begin();
                 it != deleted_key_ranges.end();) {
              if (SplitKeyRangeAndAppend(*it, resolved_mutation_op.keys[i],
                                         &split_key_ranges)) {
                it = deleted_key_ranges.erase(it);
              } else {
                ++it;
              }
            }
            // Insert split key ranges back into the list of deleted key ranges.
            for (const KeyRange& key_range : split_key_ranges) {
              deleted_key_ranges.push_back(key_range);
            }
          }
          if (resolved_mutation_op.type == MutationOpType::kUpdate) {
            for (const KeyRange& key_range :
                 deleted_key_ranges_by_table_[table_name]) {
              if (key_range.Contains(resolved_mutation_op.keys[i])) {
                return error::UpdateDeletedRowInTransaction(
                    table_name, resolved_mutation_op.keys[i].DebugString());
              }
            }
          }
          ZETASQL_ASSIGN_OR_RETURN(
              std::vector<WriteOp> write_ops,
              FlattenNonDeleteOpRow(
                  resolved_mutation_op.type, resolved_mutation_op.table,
                  resolved_mutation_op.columns, resolved_mutation_op.keys[i],
                  resolved_mutation_op.rows[i], transaction_store_.get()));

          if (has_delete_cascade_foreign_key) {
            ZETASQL_RETURN_IF_ERROR(fk_restrictions.ValidateReferencedMods(
                write_ops, table_name, schema_));
          }

          ZETASQL_RETURN_IF_ERROR(ProcessWriteOps(write_ops));
        }
      }
    }
    ZETASQL_RETURN_IF_ERROR(ApplyStatementVerifiers());
    // We defer all commit timestamp tracking to the end of the write to avoid
    // effector reads being rejected because of a previously written op in the
    // same call to ReadWriteTransaction::Write (e.g. updates to two rows in the
    // same table with an indexed commit timestamp column can be rejected due
    // to effector reads if the updates are split into separate Write calls, but
    // should succeed if written together).
    UpdateTrackedCommitTimestamps();
    return absl::OkStatus();
  });
}