backend/transaction/resolve.cc (311 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "backend/transaction/resolve.h"
#include <algorithm>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "backend/access/write.h"
#include "backend/common/case.h"
#include "backend/common/rows.h"
#include "backend/datamodel/key_set.h"
#include "backend/schema/catalog/change_stream.h"
#include "backend/schema/catalog/column.h"
#include "backend/schema/catalog/schema.h"
#include "backend/transaction/commit_timestamp.h"
#include "common/change_stream.h"
#include "common/constants.h"
#include "common/errors.h"
#include "absl/status/status.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace backend {
namespace {
KeySet SetSortOrder(const Table* table, const KeySet& key_set) {
KeySet result_set;
for (auto& key : key_set.keys()) {
Key new_key(key);
// Set ascending and descending values for the keys.
for (int i = 0; i < new_key.NumColumns(); ++i) {
new_key.SetColumnDescending(i, table->primary_key()[i]->is_descending());
new_key.SetColumnNullsLast(i, table->primary_key()[i]->is_nulls_last());
}
result_set.AddKey(new_key);
}
for (auto& range : key_set.ranges()) {
KeyRange new_range(range);
// Set ascending and descending values for the keys.
for (int i = 0; i < range.start_key().NumColumns(); ++i) {
new_range.start_key().SetColumnDescending(
i, table->primary_key()[i]->is_descending());
new_range.start_key().SetColumnNullsLast(
i, table->primary_key()[i]->is_nulls_last());
}
for (int i = 0; i < range.limit_key().NumColumns(); ++i) {
new_range.limit_key().SetColumnDescending(
i, table->primary_key()[i]->is_descending());
new_range.limit_key().SetColumnNullsLast(
i, table->primary_key()[i]->is_nulls_last());
}
result_set.AddRange(new_range);
}
return result_set;
}
void CanonicalizeKeySetForTable(const KeySet& set, const Table* table,
std::vector<KeyRange>* ranges) {
KeySet ordered_set = SetSortOrder(table, set);
MakeDisjointKeyRanges(ordered_set, ranges);
}
absl::Status ValidateColumnsAreNotDuplicate(
const std::vector<std::string>& column_names) {
CaseInsensitiveStringSet columns;
for (const std::string& column_name : column_names) {
if (columns.find(column_name) != columns.end()) {
return error::MultipleValuesForColumn(column_name);
}
columns.insert(column_name);
}
return absl::OkStatus();
}
absl::Status ValidateNotNullColumnsPresent(
const Table* table, const std::vector<const Column*>& columns) {
// TODO: Find a way of doing this without creating a hash set for
// every insert.
absl::flat_hash_set<const Column*> inserted_columns;
for (const auto column : columns) {
if (!column->is_nullable()) {
inserted_columns.insert(column);
}
}
for (const auto column : table->columns()) {
// Writing null to non-nullable generated and default columns is temporarily
// fine, since the violation may be fixed later by a generated operation to
// update the column.
if (!column->is_nullable() && !column->is_generated() &&
!column->has_default_value() && !inserted_columns.contains(column)) {
return error::NonNullValueNotSpecifiedForInsert(table->Name(),
column->Name());
}
}
return absl::OkStatus();
}
absl::Status ValidateGeneratedColumnsNotPresent(
const Table* table, const std::vector<const Column*>& columns,
MutationOpType op_type) {
for (const Column* column : columns) {
if (column->is_generated() &&
(table->FindKeyColumn(column->Name()) == nullptr ||
op_type != MutationOpType::kUpdate)) {
return error::CannotWriteToGeneratedColumn(table->Name(), column->Name());
}
}
return absl::OkStatus();
}
bool AreAllDependentColumnsPresent(const Column* column,
const std::vector<const Column*>& columns) {
// Checks if user has supplied values for all dependent columns of a key
// generated column.
ABSL_DCHECK(column->is_generated());
for (const Column* dep_col : column->dependent_columns()) {
// A generated dependent column should have all dependent columns on the
// generated column present.
// Non-generated dependent column should be specified by user.
// If a dependent column is a default column and value is not provided,
// default value will be used to evaluate the key column value.
if (dep_col->has_default_value()) continue;
if (dep_col->is_generated() &&
column->table()->FindKeyColumn(dep_col->Name()) != nullptr) {
if (!AreAllDependentColumnsPresent(dep_col, columns)) {
return false;
}
} else if (std::find(columns.begin(), columns.end(), dep_col) ==
columns.end()) {
return false;
}
}
return true;
}
// During update op, validate that all default primary keys are present,
// and that all the generated key or the dependent columns are present.
// During ops other than update (insert,insertorupdate,replace) validate that
// all columns dependent on a generated key column are present.
absl::Status ValidateDefaultAndGeneratedKeys(
const Table* table, const std::vector<const Column*>& columns,
MutationOpType op_type) {
for (const KeyColumn* key_column : table->primary_key()) {
const Column* column = key_column->column();
bool user_supplied_value =
std::find(columns.begin(), columns.end(), column) != columns.end();
// For update mutations user should supply explicit value for default key
// columns.
if (column->has_default_value() && !user_supplied_value &&
op_type == MutationOpType::kUpdate) {
return error::DefaultPKNeedsExplicitValue(column->FullName(), "UPDATE");
} else if (column->is_generated() && !user_supplied_value &&
!AreAllDependentColumnsPresent(column, columns)) {
// For update mutations user should either supply value for generated key
// columns or should supply values for all dependent columns on the key
// column.
if (op_type == MutationOpType::kUpdate) {
return error::GeneratedPKNeedsExplicitValue(column->FullName());
} else {
// For mutations other than update, user should supply value for all
// dependent columns on key column.
return error::NeedAllDependentColumnsForGpk(column->FullName());
}
}
}
return absl::OkStatus();
}
// Validate that all default primary keys are present in an UPDATE op.
ABSL_ATTRIBUTE_UNUSED absl::Status ValidateDefaultKeysInUpdateOp(
const Table* table, const std::vector<const Column*>& columns) {
for (const KeyColumn* key_column : table->primary_key()) {
const Column* column = key_column->column();
if (column->has_default_value() &&
std::find(columns.begin(), columns.end(), column) == columns.end()) {
// This key column has to be present in the column list of the op:
return error::DefaultPKNeedsExplicitValue(column->FullName(), "UPDATE");
}
}
return absl::OkStatus();
}
} // namespace
// Extracts the primary key column indices from the given list of columns. The
// returned indices will be in the order specified by the primary key. Nullable
// primary key columns do not need to be specified, in which the index entry
// will be nullopt.
absl::StatusOr<std::vector<std::optional<int>>> ExtractPrimaryKeyIndices(
absl::Span<const Column* const> columns,
absl::Span<const KeyColumn* const> primary_key) {
std::vector<std::optional<int>> key_indices;
// Number of columns should be relatively small, so we just iterate over them
// to find matches.
std::vector<std::string> missing_columns;
for (const auto& key_column : primary_key) {
int i = 0;
for (; i < columns.size(); ++i) {
// Column names are case-insensitive in Cloud Spanner.
if (absl::EqualsIgnoreCase(key_column->column()->Name(),
columns[i]->Name())) {
key_indices.push_back(i);
break;
}
}
// Reached end of columns list, so this key_column was not found.
if (i == columns.size()) {
if (key_column->column()->is_nullable()) {
key_indices.push_back(std::nullopt);
} else {
return error::NullValueForNotNullColumn(
key_column->column()->table()->Name(),
key_column->column()->Name());
}
}
}
return key_indices;
}
// Computes the PrimaryKey from the given row using the key indices provided.
Key ComputeKey(const ValueList& row,
absl::Span<const KeyColumn* const> primary_key,
const std::vector<std::optional<int>>& key_indices) {
Key key;
for (int i = 0; i < primary_key.size(); i++) {
key.AddColumn(
key_indices[i].has_value()
? row[key_indices[i].value()]
: zetasql::Value::Null(primary_key[i]->column()->GetType()),
primary_key[i]->is_descending(), primary_key[i]->is_nulls_last());
}
return key;
}
absl::StatusOr<ResolvedReadArg> ResolveReadArg(const ReadArg& read_arg,
const Schema* schema) {
const Table* read_table;
if (!read_arg.change_stream_for_partition_table.empty()) {
if (schema->FindChangeStream(read_arg.change_stream_for_partition_table) ==
nullptr) {
return error::ChangeStreamNotFound(
read_arg.change_stream_for_partition_table);
}
read_table =
schema->FindChangeStream(read_arg.change_stream_for_partition_table)
->change_stream_partition_table();
} else if (!read_arg.change_stream_for_data_table.empty()) {
if (schema->FindChangeStream(read_arg.change_stream_for_data_table) ==
nullptr) {
return error::ChangeStreamNotFound(read_arg.change_stream_for_data_table);
}
read_table = schema->FindChangeStream(read_arg.change_stream_for_data_table)
->change_stream_data_table();
} else {
// Find the table to read from schema.
read_table = schema->FindTable(read_arg.table);
if (read_table == nullptr) {
return error::TableNotFound(read_arg.table);
}
}
// Check if index should be read from instead.
const Index* index = nullptr;
if (!read_arg.index.empty()) {
index = schema->FindIndex(read_arg.index);
if (index == nullptr) {
return error::IndexNotFound(read_arg.index, read_arg.table);
}
if (index->indexed_table() != read_table) {
return error::IndexTableDoesNotMatchBaseTable(
read_table->Name(), index->indexed_table()->Name(), index->Name());
}
read_table = index->index_data_table();
}
// Find the columns to read from schema.
std::vector<const Column*> columns;
for (const auto& column_name : read_arg.columns) {
const Column* column = read_table->FindColumn(column_name);
if (column == nullptr) {
if (index == nullptr) {
return error::ColumnNotFound(read_table->Name(), column_name);
}
// Check if column exists in base table and not in index table.
const Column* base_column =
index->indexed_table()->FindColumn(column_name);
if (base_column == nullptr) {
return error::ColumnNotFound(index->indexed_table()->Name(),
column_name);
}
return error::ColumnNotFoundInIndex(
index->Name(), index->indexed_table()->Name(), column_name);
}
columns.push_back(column);
}
// Convert key set to canonicalized key ranges.
std::vector<KeyRange> key_ranges;
CanonicalizeKeySetForTable(read_arg.key_set, read_table, &key_ranges);
ResolvedReadArg resolved_read_arg;
resolved_read_arg.table = read_table;
resolved_read_arg.key_ranges = std::move(key_ranges);
resolved_read_arg.columns = columns;
return resolved_read_arg;
}
absl::StatusOr<const Table*> FindChangeStreamPartitionTable(
const Schema* schema, std::string change_stream_partition_table_name) {
const std::string change_stream_name = std::string(
absl::ClippedSubstr(change_stream_partition_table_name,
sizeof(kChangeStreamPartitionTablePrefix) - 1));
auto iter = schema->FindChangeStream(change_stream_name);
if (iter == nullptr) {
return error::ChangeStreamNotFound(change_stream_name);
}
return iter->change_stream_partition_table();
}
absl::Status ValidateNonDeleteMutationOp(const MutationOp& mutation_op,
const Schema* schema) {
ZETASQL_RET_CHECK(mutation_op.type != MutationOpType::kDelete);
const Table* table = schema->FindTable(mutation_op.table);
if (IsChangeStreamPartitionTable(mutation_op.table)) {
ZETASQL_ASSIGN_OR_RETURN(table,
FindChangeStreamPartitionTable(schema, mutation_op.table));
}
if (table == nullptr) {
return error::TableNotFound(mutation_op.table);
}
ZETASQL_RETURN_IF_ERROR(ValidateColumnsAreNotDuplicate(mutation_op.columns));
ZETASQL_ASSIGN_OR_RETURN(std::vector<const Column*> columns,
GetColumnsByName(table, mutation_op.columns));
ZETASQL_RETURN_IF_ERROR(
ValidateDefaultAndGeneratedKeys(table, columns, mutation_op.type));
if (mutation_op.type != MutationOpType::kUpdate) {
// Insert, InsertOrUpdate and Replace mutation ops require that all
// not-null columns be present in the mutation. Note: this check is
// specifically done before InsertOrUpdate & Replace mutation ops are
// flattened.
ZETASQL_RETURN_IF_ERROR(ValidateNotNullColumnsPresent(table, columns));
}
ZETASQL_RETURN_IF_ERROR(
ValidateGeneratedColumnsNotPresent(table, columns, mutation_op.type));
return absl::OkStatus();
}
absl::StatusOr<ResolvedMutationOp> ResolveDeleteMutationOp(
const MutationOp& mutation_op, const Schema* schema, absl::Time now) {
ZETASQL_RET_CHECK(mutation_op.type == MutationOpType::kDelete);
const Table* table = schema->FindTable(mutation_op.table);
if (table == nullptr) {
return error::TableNotFound(mutation_op.table);
}
ResolvedMutationOp resolved_mutation_op;
resolved_mutation_op.table = table;
resolved_mutation_op.type = mutation_op.type;
// Invalid commit timestamp key ranges for delete ops need to be caught
// before CanonicalizeKeySetForTable filters out invalid key ranges with
// future timestamp(s).
ZETASQL_RETURN_IF_ERROR(ValidateCommitTimestampKeySetForDeleteOp(
table, mutation_op.key_set, now));
std::vector<KeyRange> key_ranges;
CanonicalizeKeySetForTable(mutation_op.key_set, table, &key_ranges);
for (const KeyRange& key_range : key_ranges) {
// Transaction store may contain commit timestamp sentinel value until
// flush, if requested so in a previous mutation. Hence, convert key
// values to timestamp sentinel value to delete such buffered rows.
ZETASQL_ASSIGN_OR_RETURN(
resolved_mutation_op.key_ranges.emplace_back(),
MaybeSetCommitTimestampSentinel(table->primary_key(), key_range));
}
return resolved_mutation_op;
}
} // namespace backend
} // namespace emulator
} // namespace spanner
} // namespace google