backend/query/query_engine.cc (982 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "backend/query/query_engine.h" #include <algorithm> #include <cassert> #include <cstddef> #include <cstdint> #include <cstdio> #include <map> #include <memory> #include <optional> #include <set> #include <string> #include <tuple> #include <utility> #include <vector> #include "google/spanner/admin/database/v1/common.pb.h" #include "google/spanner/v1/spanner.pb.h" #include "zetasql/public/analyzer.h" #include "zetasql/public/analyzer_options.h" #include "zetasql/public/catalog.h" #include "zetasql/public/evaluator.h" #include "zetasql/public/evaluator_table_iterator.h" #include "zetasql/public/language_options.h" #include "zetasql/public/options.pb.h" #include "zetasql/public/parse_helpers.h" #include "zetasql/public/type.h" #include "zetasql/public/types/type_factory.h" #include "zetasql/public/value.h" #include "zetasql/resolved_ast/resolved_ast.h" #include "zetasql/resolved_ast/resolved_node.h" #include "zetasql/resolved_ast/resolved_node_kind.pb.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/cord.h" #include "absl/strings/match.h" #include "absl/strings/substitute.h" #include "absl/time/clock.h" #include "absl/time/time.h" #include "backend/access/read.h" #include "backend/access/write.h" #include "backend/common/case.h" #include "backend/datamodel/key.h" #include "backend/datamodel/key_set.h" #include "backend/datamodel/value.h" #include "backend/query/analyzer_options.h" #include "backend/query/ann_functions_rewriter.h" #include "backend/query/ann_validator.h" #include "backend/query/catalog.h" #include "backend/query/change_stream/change_stream_query_validator.h" #include "backend/query/dml_query_validator.h" #include "backend/query/feature_filter/query_size_limits_checker.h" #include "backend/query/function_catalog.h" #include "backend/query/hint_rewriter.h" #include "backend/query/index_hint_validator.h" #include "backend/query/partitionability_validator.h" #include "backend/query/partitioned_dml_validator.h" #include "backend/query/query_context.h" #include "backend/query/query_engine_options.h" #include "backend/query/query_validator.h" #include "backend/query/queryable_column.h" #include "backend/query/queryable_view.h" #include "backend/schema/catalog/schema.h" #include "backend/transaction/commit_timestamp.h" #include "common/config.h" #include "common/constants.h" #include "common/errors.h" #include "common/feature_flags.h" #include "common/limits.h" #include "frontend/converters/values.h" #include "third_party/spanner_pg/interface/emulator_parser.h" #include "third_party/spanner_pg/interface/pg_arena.h" #include "third_party/spanner_pg/shims/memory_context_pg_arena.h" #include "zetasql/base/ret_check.h" #include "zetasql/base/status_macros.h" namespace google { namespace spanner { namespace emulator { namespace backend { namespace { // A RowCursor backed by vectors (one per each row) of values. class VectorsRowCursor : public RowCursor { public: VectorsRowCursor( const std::vector<std::string> column_names, const std::vector<const zetasql::Type*> column_types, const std::vector<std::vector<zetasql::Value>> column_values) : column_names_(column_names), column_types_(column_types), column_values_(column_values) { assert(column_names_.size() == column_types_.size()); for (int i = 0; i < column_values.size(); ++i) { assert(column_names_.size() == column_values[i].size()); } } bool Next() override { return ++row_index_ < column_values_.size(); } absl::Status Status() const override { return absl::OkStatus(); } int NumColumns() const override { return column_names_.size(); } const std::string ColumnName(int i) const override { return column_names_[i]; } const zetasql::Type* ColumnType(int i) const override { return column_types_[i]; } const zetasql::Value ColumnValue(int i) const override { return column_values_[row_index_][i]; } private: size_t row_index_ = -1; std::vector<std::string> column_names_; std::vector<const zetasql::Type*> column_types_; std::vector<std::vector<zetasql::Value>> column_values_; }; zetasql::EvaluatorOptions CommonEvaluatorOptions( zetasql::TypeFactory* type_factory, const std::string time_zone) { zetasql::EvaluatorOptions options; options.type_factory = type_factory; absl::TimeZone time_zone_obj; absl::LoadTimeZone(time_zone, &time_zone_obj); options.default_time_zone = time_zone_obj; options.scramble_undefined_orderings = true; return options; } absl::StatusOr<zetasql::AnalyzerOptions> MakeAnalyzerOptionsWithParameters( const zetasql::ParameterValueMap& params, const std::string time_zone) { zetasql::AnalyzerOptions options = MakeGoogleSqlAnalyzerOptions(time_zone); for (const auto& [name, value] : params) { ZETASQL_RETURN_IF_ERROR(options.AddQueryParameter(name, value.type())); } // Enable SQL Graph language feature. options.mutable_language()->EnableLanguageFeature( zetasql::FEATURE_V_1_4_SQL_GRAPH); options.mutable_language()->EnableLanguageFeature( zetasql::FEATURE_V_1_4_SQL_GRAPH_ADVANCED_QUERY); options.mutable_language()->EnableLanguageFeature( zetasql::FEATURE_V_1_4_SQL_GRAPH_BOUNDED_PATH_QUANTIFICATION); options.mutable_language()->EnableLanguageFeature( zetasql::FEATURE_V_1_4_SQL_GRAPH_PATH_TYPE); options.mutable_language()->EnableLanguageFeature( zetasql::FEATURE_V_1_4_SQL_GRAPH_PATH_MODE); return options; } // Uses googlesql/public/analyzer to build an AnalyzerOutput for an query. // We need to analyze the SQL before executing it in order to determine what // kind of statement (query or DML) it is. absl::StatusOr<std::unique_ptr<const zetasql::AnalyzerOutput>> Analyze( const std::string& sql, zetasql::Catalog* catalog, const zetasql::AnalyzerOptions& options, zetasql::TypeFactory* type_factory) { // Check the overall length of the query string. if (sql.size() > limits::kMaxQueryStringSize) { return error::QueryStringTooLong(sql.size(), limits::kMaxQueryStringSize); } std::unique_ptr<const zetasql::AnalyzerOutput> output; ZETASQL_RETURN_IF_ERROR(zetasql::AnalyzeStatement(sql, options, catalog, type_factory, &output)); return output; } absl::StatusOr<std::unique_ptr<const zetasql::AnalyzerOutput>> AnalyzePostgreSQL(const std::string& sql, zetasql::EnumerableCatalog* catalog, zetasql::AnalyzerOptions& options, zetasql::TypeFactory* type_factory, const FunctionCatalog* function_catalog) { // Check the overall length of the query string. if (sql.size() > limits::kMaxQueryStringSize) { return error::QueryStringTooLong(sql.size(), limits::kMaxQueryStringSize); } options.CreateDefaultArenasIfNotSet(); ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<postgres_translator::interfaces::PGArena> arena, postgres_translator::spangres::MemoryContextPGArena::Init(nullptr)); // PG needs ASC NULLS LAST and DESC NULLS FIRST for functions implemented as // a SQL rewrite. options.mutable_language()->EnableLanguageFeature( zetasql::FEATURE_V_1_3_NULLS_FIRST_LAST_IN_ORDER_BY); return postgres_translator::spangres::ParseAndAnalyzePostgreSQL( sql, catalog, options, type_factory, std::make_unique<FunctionCatalog>( type_factory, /*catalog_name=*/kCloudSpannerEmulatorFunctionCatalogName, /*schema=*/function_catalog->GetLatestSchema())); } // TODO : Replace with a better error transforming mechanism, // ideally hooking into ZetaSQL to control error messages. absl::Status MaybeTransformZetaSQLDMLError(absl::Status error) { // For inserts to a primary key columm. if (absl::StartsWith(error.message(), "Failed to insert row with primary key")) { absl::Status error_status(absl::StatusCode::kAlreadyExists, error.message()); // Inserting a key that already exists is a constraint error and will cause // the transaction to become invalidated. error_status.SetPayload(kConstraintError, absl::Cord("")); return error_status; } // For updates to a primary key column. if (absl::StartsWith(error.message(), "Cannot modify a primary key column")) { return absl::Status(absl::StatusCode::kInvalidArgument, error.message()); } return error; } bool IsGenerated(const zetasql::Column* column) { return column->GetAs<QueryableColumn>()->wrapped_column()->is_generated(); } struct ExecuteUpdateResult { // A mutation which describes the set of data changes required by the DML // statement. Mutation mutation; // Number of rows that were modified. int64_t modify_row_count; // Output row cursor from THEN RETURN clause. This row cursor pointer will be // NULL if there are no output rows like regular DMLs. std::unique_ptr<RowCursor> returning_row_cursor; }; absl::StatusOr<std::unique_ptr<RowCursor>> BuildReturningRowResult( std::unique_ptr<zetasql::EvaluatorTableIterator> iterator) { if (iterator == nullptr) { return nullptr; } std::vector<std::vector<zetasql::Value>> values; while (iterator->NextRow()) { values.emplace_back(); values.back().reserve(iterator->NumColumns()); for (int i = 0; i < iterator->NumColumns(); ++i) { values.back().push_back(iterator->GetValue(i)); } } ZETASQL_RETURN_IF_ERROR(iterator->Status()); std::vector<std::string> names; std::vector<const zetasql::Type*> types; for (int i = 0; i < iterator->NumColumns(); ++i) { names.push_back(iterator->GetColumnName(i)); types.push_back(iterator->GetColumnType(i)); } return std::make_unique<VectorsRowCursor>(names, types, values); } // Builds a INSERT mutation and returns it along with a count of inserted rows // and an indication whether the input to the insert statement contained any // rows or not. The latter can be used to determine whether an update count of // 0 means that an OR IGNORE clause filtered away all rows, or whether the input // query for the INSERT statement yielded zero rows. absl::StatusOr<std::tuple<Mutation, int64_t, bool>> BuildInsert( std::unique_ptr<zetasql::EvaluatorTableModifyIterator> iterator, MutationOpType op_type, const CaseInsensitiveStringSet& pending_ts_columns, bool is_upsert_query, DatabaseDialect database_dialect) { const zetasql::Table* table = iterator->table(); absl::flat_hash_set<int> generated_columns; std::vector<std::string> column_names; std::optional<std::vector<int>> key_offsets = table->PrimaryKey(); for (int i = 0; i < table->NumColumns(); ++i) { if (IsGenerated(table->GetColumn(i))) { if (key_offsets.has_value() && std::find(key_offsets->begin(), key_offsets->end(), i) != key_offsets->end()) { // TODO: b/310194797 - GSQL reference implementation returns NULL for // generated keys or columns in POSTGRES dialect. GSQL AST constructed // from Spangres transformer does not contain nodes to compute the // generated columns unlike when ZetaSQL Analyzer is used for // ZetaSQL queries. if (database_dialect == DatabaseDialect::POSTGRESQL && is_upsert_query) { return error::UnsupportedGeneratedKeyWithUpsertQueries(); } } generated_columns.insert(i); continue; } column_names.push_back(table->GetColumn(i)->Name()); } // Keys of insert rows std::set<Key> seen_keys; std::vector<ValueList> values; bool has_rows = false; while (iterator->NextRow()) { has_rows = true; values.emplace_back(); Key row_key; for (int i = 0; i < table->NumColumns(); ++i) { zetasql::Value column_value = iterator->GetColumnValue(i); if (pending_ts_columns.find(table->GetColumn(i)->Name()) != pending_ts_columns.end()) { column_value = zetasql::Value::StringValue(kCommitTimestampIdentifier); } // Build the primary key (including generated keys) for each insert row // for INSERT OR UPDATE statement to check for unsupported scenarios // below. For instance, multiple insert rows with same primary key in // INSERT OR UPDATE query is not supported. `row_key` is used to identify // this and return appropriate error. if (op_type == MutationOpType::kInsertOrUpdate && key_offsets.has_value() && std::find(key_offsets->begin(), key_offsets->end(), i) != key_offsets->end()) { row_key.AddColumn(column_value); } if (generated_columns.contains(i)) { continue; } values.back().push_back(column_value); } if (op_type == MutationOpType::kInsertOrUpdate) { // Spanner returns error when multiple insert rows have same key in // INSERT OR UPDATE statement. if (seen_keys.find(row_key) != seen_keys.end()) { return error::CannotInsertDuplicateKeyInsertOrUpdateDml( row_key.DebugString()); } else { seen_keys.insert(row_key); } } } Mutation mutation; // `values` can be empty if INSERT OR IGNORE query contains insert rows that // already existed. In this case, return empty mutation with rows modified // count as 0. if (values.empty()) { return std::make_tuple(mutation, 0, has_rows); } mutation.AddWriteOp(op_type, table->FullName(), column_names, values); return std::make_tuple(mutation, values.size(), has_rows); } // Returns true if the provided column-value pair contains a pending commit // timestamp sentinel. bool IsPendingCommitTimestampSentinel(const Schema* schema, const zetasql::Table* table, const zetasql::Column* column, const zetasql::Value& value) { return IsPendingCommitTimestamp( schema->FindTable(table->FullName())->FindColumn(column->Name()), value); } // Builds a UPDATE mutation and returns it along with a count of updated rows. std::pair<Mutation, int64_t> BuildUpdate( std::unique_ptr<zetasql::EvaluatorTableModifyIterator> iterator, MutationOpType op_type, const CaseInsensitiveStringSet& pending_ts_columns, const CaseInsensitiveStringSet& updated_columns, const Schema* schema) { const zetasql::Table* table = iterator->table(); absl::flat_hash_set<int> generated_columns; std::vector<std::string> column_names; for (int i = 0; i < table->NumColumns(); ++i) { if (IsGenerated(table->GetColumn(i))) { generated_columns.insert(i); continue; } column_names.push_back(table->GetColumn(i)->Name()); } std::vector<ValueList> values; while (iterator->NextRow()) { values.emplace_back(); for (int i = 0; i < table->NumColumns(); ++i) { if (generated_columns.contains(i)) { continue; } const zetasql::Column* column = table->GetColumn(i); zetasql::Value value = iterator->GetColumnValue(i); if (pending_ts_columns.contains(column->Name()) || // Also replace previously written commit timestamp sentinels with // the string representation. Otherwise, these previously written // sentinels will appear to ReadWriteTransaction to be user-specified // timestamps from the future (which MaybeSetCommitTimestampSentinel // will then reject). (!updated_columns.contains(column->Name()) && IsPendingCommitTimestampSentinel(schema, table, column, value))) { values.back().push_back( zetasql::Value::StringValue(kCommitTimestampIdentifier)); } else { values.back().push_back(std::move(value)); } } } Mutation mutation; mutation.AddWriteOp(op_type, table->FullName(), column_names, values); return std::make_pair(mutation, values.size()); } // Builds a DELETE mutation and returns it along with a count of deleted rows. std::pair<Mutation, int64_t> BuildDelete( std::unique_ptr<zetasql::EvaluatorTableModifyIterator> iterator) { const zetasql::Table* table = iterator->table(); KeySet key_set; if (!table->PrimaryKey().has_value() && iterator->NextRow()) { // There is no primary key in the case of a singleton table. Delete // mutation will contain an empty key set in such a case if there is a row // to be deleted. key_set.AddKey(Key{}); } else { while (iterator->NextRow()) { ValueList key_values; for (int i = 0; i < table->PrimaryKey()->size(); ++i) { key_values.push_back(iterator->GetOriginalKeyValue(i)); } key_set.AddKey(Key{key_values}); } } Mutation mutation; mutation.AddDeleteOp(table->FullName(), key_set); return std::make_pair(mutation, key_set.keys().size()); } // Returns true if the ResolvedDMLValue is a call to PENDING_COMMIT_TIMESTAMP() bool IsPendingCommitTimestamp(const zetasql::ResolvedDMLValue& dml_value) { if (dml_value.value()->node_kind() == zetasql::RESOLVED_FUNCTION_CALL) { const zetasql::ResolvedFunctionCall* fn = dml_value.value()->GetAs<zetasql::ResolvedFunctionCall>(); if (fn->function()->Name() == kPendingCommitTimestampFunctionName) { // Touch the argument list so that the ResolvedAST code does not claim we // missed it. return fn->argument_list().empty(); } } return false; } absl::StatusOr<CaseInsensitiveStringSet> PendingCommitTimestampColumnsInInsert( const std::vector<zetasql::ResolvedColumn>& insert_columns, const std::vector<std::unique_ptr<const zetasql::ResolvedInsertRow>>& insert_rows) { int64_t num_columns = insert_columns.size(); CaseInsensitiveStringSet pending_ts_columns; for (const auto& insert_row : insert_rows) { for (int i = 0; i < num_columns; ++i) { if (IsPendingCommitTimestamp(*insert_row->value_list()[i])) { pending_ts_columns.insert(insert_columns.at(i).name()); } } } for (const auto& insert_row : insert_rows) { for (int i = 0; i < num_columns; ++i) { const auto& dml_value = insert_row->value_list()[i]; const auto& column_name = insert_columns.at(i).name(); if (pending_ts_columns.find(column_name) != pending_ts_columns.end() && !IsPendingCommitTimestamp(*dml_value)) { return error::PendingCommitTimestampAllOrNone(i + 1); } } } return pending_ts_columns; } absl::StatusOr<CaseInsensitiveStringSet> PendingCommitTimestampColumnsInUpdate( const std::vector<std::unique_ptr<const zetasql::ResolvedUpdateItem>>& update_item_list) { CaseInsensitiveStringSet pending_ts_columns; for (const auto& update_item : update_item_list) { if (update_item->set_value() && IsPendingCommitTimestamp(*update_item->set_value())) { std::string column_name = update_item->target() ->GetAs<zetasql::ResolvedColumnRef>() ->column() .name(); pending_ts_columns.insert(column_name); } } return pending_ts_columns; } // Extracts the set of column names being updated. absl::StatusOr<CaseInsensitiveStringSet> ColumnsInUpdate( const std::vector<std::unique_ptr<const zetasql::ResolvedUpdateItem>>& update_item_list) { CaseInsensitiveStringSet columns; for (const auto& update_item : update_item_list) { if (update_item->set_value()) { std::vector<const zetasql::ResolvedNode*> column_refs; update_item->target()->GetDescendantsWithKinds( {zetasql::RESOLVED_COLUMN_REF}, &column_refs); ZETASQL_RET_CHECK_EQ(column_refs.size(), 1); std::string column_name = column_refs[0] ->GetAs<zetasql::ResolvedColumnRef>() ->column() .name(); columns.insert(std::move(column_name)); } } return columns; } absl::StatusOr<ExecuteUpdateResult> EvaluateResolvedInsert( const zetasql::ResolvedInsertStmt* insert_statement, const zetasql::ParameterValueMap& parameters, zetasql::TypeFactory* type_factory, DatabaseDialect database_dialect, const std::string time_zone) { if (insert_statement->insert_mode() == zetasql::ResolvedInsertStmt::OR_REPLACE) { return error::UnsupportedUpsertQueries("Insert or replace"); } bool is_upsert_query = insert_statement->insert_mode() == zetasql::ResolvedInsertStmt::OR_IGNORE || insert_statement->insert_mode() == zetasql::ResolvedInsertStmt::OR_UPDATE; MutationOpType op_type = MutationOpType::kInsert; if (is_upsert_query) { std::string insert_mode_name = (insert_statement->insert_mode() == zetasql::ResolvedInsertStmt::OR_IGNORE) ? "Insert or ignore" : "Insert or update"; if (!EmulatorFeatureFlags::instance().flags().enable_upsert_queries) { return error::UnsupportedUpsertQueries(insert_mode_name); } if (!EmulatorFeatureFlags::instance() .flags() .enable_upsert_queries_with_returning && insert_statement->returning() != nullptr) { return error::UnsupportedReturningWithUpsertQueries(insert_mode_name); } if (insert_statement->insert_mode() == zetasql::ResolvedInsertStmt::OR_UPDATE) { op_type = MutationOpType::kInsertOrUpdate; } } ZETASQL_ASSIGN_OR_RETURN(auto pending_ts_columns, PendingCommitTimestampColumnsInInsert( insert_statement->insert_column_list(), insert_statement->row_list())); auto prepared_insert = std::make_unique<zetasql::PreparedModify>( insert_statement, CommonEvaluatorOptions(type_factory, time_zone)); ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options, MakeAnalyzerOptionsWithParameters(parameters, time_zone)); ZETASQL_RETURN_IF_ERROR(prepared_insert->Prepare(analyzer_options)); std::unique_ptr<zetasql::EvaluatorTableIterator> returning_iter; // `returning_iter` can be NULL if there is no THEN RETURN clause. auto status_or = prepared_insert->Execute(parameters, {}, &returning_iter); if (!status_or.ok()) { return MaybeTransformZetaSQLDMLError(status_or.status()); } auto iterator = std::move(status_or).value(); ZETASQL_ASSIGN_OR_RETURN(auto cursor, BuildReturningRowResult(std::move(returning_iter))); ZETASQL_ASSIGN_OR_RETURN(const auto& mutation_and_count, BuildInsert(std::move(iterator), op_type, pending_ts_columns, is_upsert_query, database_dialect)); // Resulting mutation count can be 0 only if all insert rows already // existed and none were inserted due to OR_IGNORE insert mode, or if the // insert statement used a SELECT statement with a WHERE clause. // Validate the invariants if either the count is zero or result mutations are // empty. if (std::get<2>(mutation_and_count) && (std::get<1>(mutation_and_count) == 0 || std::get<0>(mutation_and_count).ops().empty())) { ZETASQL_RET_CHECK(insert_statement->insert_mode() == zetasql::ResolvedInsertStmt::OR_IGNORE); ZETASQL_RET_CHECK(std::get<0>(mutation_and_count).ops().empty()); ZETASQL_RET_CHECK_EQ(std::get<1>(mutation_and_count), 0); } return ExecuteUpdateResult{std::get<0>(mutation_and_count), std::get<1>(mutation_and_count), std::move(cursor)}; } absl::StatusOr<ExecuteUpdateResult> EvaluateResolvedUpdate( const zetasql::ResolvedUpdateStmt* update_statement, const zetasql::ParameterValueMap& parameters, zetasql::TypeFactory* type_factory, const Schema* schema, const std::string time_zone) { ZETASQL_ASSIGN_OR_RETURN(auto updated_columns, ColumnsInUpdate(update_statement->update_item_list())); ZETASQL_ASSIGN_OR_RETURN(auto pending_ts_columns, PendingCommitTimestampColumnsInUpdate( update_statement->update_item_list())); auto prepared_update = std::make_unique<zetasql::PreparedModify>( update_statement, CommonEvaluatorOptions(type_factory, time_zone)); ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options, MakeAnalyzerOptionsWithParameters(parameters, time_zone)); ZETASQL_RETURN_IF_ERROR(prepared_update->Prepare(analyzer_options)); std::unique_ptr<zetasql::EvaluatorTableIterator> returning_iter; auto status_or = prepared_update->Execute(parameters, {}, &returning_iter); if (!status_or.ok()) { return MaybeTransformZetaSQLDMLError(status_or.status()); } auto iterator = std::move(status_or).value(); ZETASQL_ASSIGN_OR_RETURN(auto cursor, BuildReturningRowResult(std::move(returning_iter))); const auto& mutation_and_count = BuildUpdate(std::move(iterator), MutationOpType::kUpdate, pending_ts_columns, updated_columns, schema); return ExecuteUpdateResult{mutation_and_count.first, mutation_and_count.second, std::move(cursor)}; } absl::StatusOr<ExecuteUpdateResult> EvaluateResolvedDelete( const zetasql::ResolvedDeleteStmt* delete_statement, const zetasql::ParameterValueMap& parameters, zetasql::TypeFactory* type_factory, const std::string time_zone) { auto prepared_delete = std::make_unique<zetasql::PreparedModify>( delete_statement, CommonEvaluatorOptions(type_factory, time_zone)); ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options, MakeAnalyzerOptionsWithParameters(parameters, time_zone)); ZETASQL_RETURN_IF_ERROR(prepared_delete->Prepare(analyzer_options)); std::unique_ptr<zetasql::EvaluatorTableIterator> returning_iter; ZETASQL_ASSIGN_OR_RETURN(auto iterator, prepared_delete->Execute(parameters, {}, &returning_iter)); ZETASQL_ASSIGN_OR_RETURN(auto cursor, BuildReturningRowResult(std::move(returning_iter))); const auto& mutation_and_count = BuildDelete(std::move(iterator)); return ExecuteUpdateResult{mutation_and_count.first, mutation_and_count.second, std::move(cursor)}; } // Uses googlesql/public/evaluator to evaluate a DML statement represented by a // resolved AST and returns a pair of mutation and count of modified rows. absl::StatusOr<ExecuteUpdateResult> EvaluateUpdate( const zetasql::ResolvedStatement* resolved_statement, zetasql::Catalog* catalog, const zetasql::ParameterValueMap& parameters, zetasql::TypeFactory* type_factory, DatabaseDialect database_dialect, const Schema* schema, const std::string time_zone) { switch (resolved_statement->node_kind()) { case zetasql::RESOLVED_INSERT_STMT: return EvaluateResolvedInsert( resolved_statement->GetAs<zetasql::ResolvedInsertStmt>(), parameters, type_factory, database_dialect, time_zone); case zetasql::RESOLVED_UPDATE_STMT: return EvaluateResolvedUpdate( resolved_statement->GetAs<zetasql::ResolvedUpdateStmt>(), parameters, type_factory, schema, time_zone); case zetasql::RESOLVED_DELETE_STMT: return EvaluateResolvedDelete( resolved_statement->GetAs<zetasql::ResolvedDeleteStmt>(), parameters, type_factory, time_zone); default: ZETASQL_RET_CHECK_FAIL() << "Unsupported support node kind " << ResolvedNodeKind_Name( resolved_statement->node_kind()); break; } } absl::StatusOr<std::unique_ptr<RowCursor>> ResolveCallStatement() { std::vector<std::string> names; std::vector<const zetasql::Type*> types; std::vector<std::vector<zetasql::Value>> values; names.push_back("result"); types.push_back(zetasql::types::BoolType()); return std::make_unique<VectorsRowCursor>(names, types, values); } // Uses googlesql/public/evaluator to evaluate a query statement represented by // a resolved AST and returns a row cursor. absl::StatusOr<std::unique_ptr<RowCursor>> EvaluateQuery( const zetasql::ResolvedStatement* resolved_statement, const zetasql::ParameterValueMap& params, zetasql::TypeFactory* type_factory, int64_t* num_output_rows, const v1::ExecuteSqlRequest_QueryMode query_mode, const std::string time_zone) { if (resolved_statement->node_kind() == zetasql::RESOLVED_CALL_STMT) { // Evaluation of a CALL statement is currently a no-op. This is added to // ensure the emulator doesn't error out when the customer tries the CALL // statement. return ResolveCallStatement(); } ZETASQL_RET_CHECK_EQ(resolved_statement->node_kind(), zetasql::RESOLVED_QUERY_STMT) << "input is not a query statement"; auto prepared_query = std::make_unique<zetasql::PreparedQuery>( resolved_statement->GetAs<zetasql::ResolvedQueryStmt>(), CommonEvaluatorOptions(type_factory, time_zone)); // Call PrepareQuery to set the AnalyzerOptions that we used to Analyze the // statement. ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options, MakeAnalyzerOptionsWithParameters(params, time_zone)); ZETASQL_RETURN_IF_ERROR(prepared_query->Prepare(analyzer_options)); // Get the query metadata from the prepared query. std::vector<std::string> names; std::vector<const zetasql::Type*> types; auto columns = prepared_query->GetColumns(); for (auto& column : columns) { names.push_back(column.first); types.push_back(column.second); } std::vector<std::vector<zetasql::Value>> values; if (query_mode == v1::ExecuteSqlRequest::PLAN) { // Return the query metadata when the query is executed in PLAN mode. // This allows clients to use PLAN to get the metadata of the query without // having to execute the query and/or supply values for all query // parameters. This is used by some drivers (e.g. JDBC) and by PGAdapter. return std::make_unique<VectorsRowCursor>(names, types, values); } else { // Finally execute the query. ZETASQL_ASSIGN_OR_RETURN(auto iterator, prepared_query->Execute(params)); while (iterator->NextRow()) { values.emplace_back(); values.back().reserve(iterator->NumColumns()); for (int i = 0; i < iterator->NumColumns(); ++i) { values.back().push_back(iterator->GetValue(i)); } } ZETASQL_RETURN_IF_ERROR(iterator->Status()); *num_output_rows = values.size(); return std::make_unique<VectorsRowCursor>(names, types, values); } } absl::StatusOr<std::map<std::string, zetasql::Value>> ExtractParameters( const Query& query, const zetasql::AnalyzerOutput* analyzer_output) { // Allow the loop below to look up undeclared parameters without worrying // about case. ZetaSQL will return the undeclared parameters using the // spelling provided in the query, but the undeclared_params map uses the // spelling from the query params section of the request. // // For example: // SELECT CAST(@pBool AS bool) with a supplied parameter "pbool". // // Do this here instead of the frontend so that the frontend does not need to // know any details about case normalization. // // This map takes pointers to elements inside of query.undeclared_params, // which is const so the pointers should not be invalidated. CaseInsensitiveStringMap<const google::protobuf::Value*> undeclared_params; for (const auto& [name, value] : query.undeclared_params) undeclared_params[name] = &value; // Build new parameter map which includes all unresolved parameters. auto params = query.declared_params; for (const auto& [name, type] : analyzer_output->undeclared_parameters()) { auto it = undeclared_params.find(name); // ZetaSQL will return an undeclared parameter error for any parameters that // do not have values specified (ie, when it == end()). if (it != undeclared_params.end()) { auto parsed_value = frontend::ValueFromProto(*it->second, type); // If the value does not parse as the given type, the error code is // kInvalidArgument, not kFailedPrecondition, for example. if (!parsed_value.ok()) { return absl::Status(absl::StatusCode::kInvalidArgument, parsed_value.status().message()); } params[name] = parsed_value.value(); } } return params; } bool IsDMLStmt(const zetasql::ResolvedNodeKind& query_kind) { return query_kind == zetasql::RESOLVED_INSERT_STMT || query_kind == zetasql::RESOLVED_UPDATE_STMT || query_kind == zetasql::RESOLVED_DELETE_STMT; } bool IsDMLStmtWitoutSelect( const zetasql::ResolvedStatement* resolved_statement) { const zetasql::ResolvedNodeKind& query_kind = resolved_statement->node_kind(); if (!IsDMLStmt(query_kind)) return false; if (query_kind == zetasql::RESOLVED_INSERT_STMT && resolved_statement->GetAs<zetasql::ResolvedInsertStmt>()->query() != nullptr) { // This is an INSERT SELECT query. return false; } return true; } absl::StatusOr<std::unique_ptr<zetasql::ResolvedStatement>> ExtractValidatedResolvedStatementAndOptions( const zetasql::AnalyzerOutput* analyzer_output, const QueryContext& context, bool in_partition_query = false, QueryEngineOptions* query_engine_options = nullptr) { ZETASQL_RET_CHECK_NE(analyzer_output->resolved_statement(), nullptr); // Rewrite query hints to use only the 'spanner' prefix. HintRewriter rewriter; ZETASQL_RETURN_IF_ERROR(analyzer_output->resolved_statement()->Accept(&rewriter)); ZETASQL_ASSIGN_OR_RETURN(auto statement, rewriter.ConsumeRootNode<zetasql::ResolvedStatement>()); // Validate the query and extract and return any options specified // through hint if the caller requested them. QueryEngineOptions options; std::unique_ptr<QueryValidator> query_validator = IsDMLStmtWitoutSelect(analyzer_output->resolved_statement()) ? std::make_unique<DMLQueryValidator>(context, &options) : std::make_unique<QueryValidator>(context, &options); // In Cloud Spanner, batch query is using PartitionQuery function. query_validator->set_in_partition_query(in_partition_query); ZETASQL_RETURN_IF_ERROR(statement->Accept(query_validator.get())); if (query_engine_options != nullptr) { *query_engine_options = options; } // Validate the index hints. bool allow_search_indexes_in_transaction = IsSearchQueryAllowed(&options, context); bool in_select_for_update_query = IsSelectForUpdateQuery(*(analyzer_output->resolved_statement())); IndexHintValidator index_hint_validator{ context.schema, options.disable_query_null_filtered_index_check || config::disable_query_null_filtered_index_check(), allow_search_indexes_in_transaction, in_partition_query, in_select_for_update_query}; ZETASQL_RETURN_IF_ERROR(statement->Accept(&index_hint_validator)); ANNFunctionsRewriter ann_functions_rewriter; ZETASQL_RETURN_IF_ERROR(statement->Accept(&ann_functions_rewriter)); ZETASQL_ASSIGN_OR_RETURN( statement, ann_functions_rewriter.ConsumeRootNode<zetasql::ResolvedStatement>()); if (!ann_functions_rewriter.ann_functions().empty()) { ANNValidator ann_validator(context.schema); ZETASQL_RETURN_IF_ERROR(statement->Accept(&ann_validator)); // Check if all the ANN functions passed the validation. for (const auto& ann_function : ann_functions_rewriter.ann_functions()) { if (!ann_validator.ann_functions().contains(ann_function)) { return error::ApproxDistanceInvalidShape( ann_function->function()->Name()); } } } // Check the query size limits // https://cloud.google.com/spanner/quotas#query_limits QuerySizeLimitsChecker checker; ZETASQL_RETURN_IF_ERROR(checker.CheckQueryAgainstLimits(statement.get())); return statement; } // Implements ResolvedASTVisitor to get the target table that various DML // statements modify. class ExtractDmlTargetTableVisitor : public zetasql::ResolvedASTVisitor { public: std::optional<std::string> target_table() const { return target_table_; } private: absl::Status VisitResolvedInsertStmt( const zetasql::ResolvedInsertStmt* node) override { target_table_ = node->table_scan()->table()->Name(); return absl::OkStatus(); } absl::Status VisitResolvedDeleteStmt( const zetasql::ResolvedDeleteStmt* node) override { target_table_ = node->table_scan()->table()->Name(); return absl::OkStatus(); } absl::Status VisitResolvedUpdateStmt( const zetasql::ResolvedUpdateStmt* node) override { target_table_ = node->table_scan()->table()->Name(); return absl::OkStatus(); } std::optional<std::string> target_table_; }; // A QueryEvaluator instance against a specific QueryEngine and QueryContext. class QueryEvaluatorForEngine : public QueryEvaluator { public: QueryEvaluatorForEngine(const QueryEngine& query_engine, const QueryContext& query_context) : query_engine_(query_engine), query_context_(query_context) {} ~QueryEvaluatorForEngine() override = default; absl::StatusOr<std::unique_ptr<RowCursor>> Evaluate( const std::string& query) override { Query q{/*sql=*/query, /*declared_params=*/{}, /*undeclared_params=*/{}}; ZETASQL_ASSIGN_OR_RETURN(auto result, query_engine_.ExecuteSql(q, query_context_, v1::ExecuteSqlRequest::NORMAL)); return std::move(result.rows); } private: const QueryEngine& query_engine_; const QueryContext& query_context_; }; } // namespace absl::StatusOr<std::string> QueryEngine::GetDmlTargetTable( const Query& query, const Schema* schema) const { ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options, MakeAnalyzerOptionsWithParameters( query.declared_params, GetTimeZone(function_catalog_.GetLatestSchema()))); analyzer_options.set_prune_unused_columns(true); Catalog catalog(schema, &function_catalog_, type_factory_, analyzer_options); ZETASQL_ASSIGN_OR_RETURN( auto analyzer_output, Analyze(query.sql, &catalog, analyzer_options, type_factory_)); ZETASQL_ASSIGN_OR_RETURN(auto params, ExtractParameters(query, analyzer_output.get())); ZETASQL_ASSIGN_OR_RETURN(auto statement, ExtractValidatedResolvedStatementAndOptions( analyzer_output.get(), {.schema = schema})); ExtractDmlTargetTableVisitor visitor; ZETASQL_RETURN_IF_ERROR(statement->Accept(&visitor)); if (!visitor.target_table()) { return absl::InvalidArgumentError(absl::Substitute( "The given query does not contain a DML statement: $0", query.sql)); } return *visitor.target_table(); } absl::StatusOr<QueryResult> QueryEngine::ExecuteSql( const Query& query, const QueryContext& context) const { return ExecuteSql(query, context, v1::ExecuteSqlRequest::NORMAL); } std::string QueryEngine::GetTimeZone(const Schema* schema) { std::string time_zone = kDefaultTimeZone; if (schema != nullptr) { time_zone = schema->default_time_zone(); } return time_zone; } absl::StatusOr<QueryResult> QueryEngine::ExecuteSql( const Query& query, const QueryContext& context, v1::ExecuteSqlRequest_QueryMode query_mode) const { absl::Time start_time = absl::Now(); ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options, MakeAnalyzerOptionsWithParameters( query.declared_params, GetTimeZone(function_catalog_.GetLatestSchema()))); analyzer_options.set_prune_unused_columns(true); QueryEvaluatorForEngine view_evaluator(*this, context); Catalog catalog{ context.schema, &function_catalog_, type_factory_, analyzer_options, context.reader, &view_evaluator, query.change_stream_internal_lookup, }; std::unique_ptr<const zetasql::AnalyzerOutput> analyzer_output; if (context.schema->dialect() == database_api::DatabaseDialect::POSTGRESQL && !query.change_stream_internal_lookup.has_value()) { ZETASQL_ASSIGN_OR_RETURN(analyzer_output, AnalyzePostgreSQL(query.sql, &catalog, analyzer_options, type_factory_, &function_catalog_)); } else { ZETASQL_ASSIGN_OR_RETURN(analyzer_output, Analyze(query.sql, &catalog, analyzer_options, type_factory_)); } ZETASQL_ASSIGN_OR_RETURN(auto params, ExtractParameters(query, analyzer_output.get())); ZETASQL_ASSIGN_OR_RETURN(auto resolved_statement, ExtractValidatedResolvedStatementAndOptions( analyzer_output.get(), context)); // Change stream queries are not directly executed via this generic ExecuteSql // function in query engine. If a change stream query reaches here, it is from // an incorrect API(only ExecuteStreamingSql is allowed). ChangeStreamQueryValidator validator{ context.schema, start_time, absl::flat_hash_map<std::string, zetasql::Value>(params.begin(), params.end())}; ZETASQL_ASSIGN_OR_RETURN(auto is_change_stream, validator.IsChangeStreamQuery(resolved_statement.get())); if (is_change_stream) { return error::ChangeStreamQueriesMustBeStreaming(); } QueryResult result; if (!IsDMLStmt(analyzer_output->resolved_statement()->node_kind())) { ZETASQL_ASSIGN_OR_RETURN( auto cursor, EvaluateQuery(resolved_statement.get(), params, type_factory_, &result.num_output_rows, query_mode, GetTimeZone(function_catalog_.GetLatestSchema()))); result.rows = std::move(cursor); } else { ZETASQL_RET_CHECK_NE(context.writer, nullptr); analyzer_options.set_prune_unused_columns(false); if (context.schema->dialect() == database_api::DatabaseDialect::POSTGRESQL) { ZETASQL_ASSIGN_OR_RETURN(analyzer_output, AnalyzePostgreSQL(query.sql, &catalog, analyzer_options, type_factory_, &function_catalog_)); } else { ZETASQL_ASSIGN_OR_RETURN( analyzer_output, Analyze(query.sql, &catalog, analyzer_options, type_factory_)); } ZETASQL_ASSIGN_OR_RETURN(resolved_statement, ExtractValidatedResolvedStatementAndOptions( analyzer_output.get(), context)); // Only execute the SQL statement if the user did not request PLAN mode. if (query_mode != v1::ExecuteSqlRequest::PLAN) { ZETASQL_ASSIGN_OR_RETURN( auto execute_update_result, EvaluateUpdate(resolved_statement.get(), &catalog, params, type_factory_, context.schema->dialect(), context.schema, GetTimeZone(function_catalog_.GetLatestSchema()))); ZETASQL_RETURN_IF_ERROR(context.writer->Write(execute_update_result.mutation)); result.modified_row_count = execute_update_result.modify_row_count; result.rows = std::move(execute_update_result.returning_row_cursor); } else { // Add the columns and types of the returning clause to the result. auto returning_clause = GetReturningClause(resolved_statement.get()); if (returning_clause != nullptr) { std::vector<std::string> names; std::vector<const zetasql::Type*> types; for (auto& column : returning_clause->output_column_list()) { names.push_back(column->column().name()); types.push_back(column->column().type()); } std::vector<std::vector<zetasql::Value>> values; result.rows = std::make_unique<VectorsRowCursor>(names, types, values); } } } // Add both undeclared and declared parameters to the result. result.parameter_types = analyzer_output->undeclared_parameters(); for (auto const& param : query.declared_params) { result.parameter_types.insert({param.first, param.second.type()}); } result.elapsed_time = absl::Now() - start_time; return result; } const zetasql::ResolvedReturningClause* GetReturningClause( const zetasql::ResolvedStatement* resolved_statement) { const zetasql::ResolvedReturningClause* returning_clause; switch (resolved_statement->node_kind()) { case zetasql::RESOLVED_INSERT_STMT: returning_clause = resolved_statement->GetAs<zetasql::ResolvedInsertStmt>() ->returning(); break; case zetasql::RESOLVED_UPDATE_STMT: returning_clause = resolved_statement->GetAs<zetasql::ResolvedUpdateStmt>() ->returning(); break; case zetasql::RESOLVED_DELETE_STMT: returning_clause = resolved_statement->GetAs<zetasql::ResolvedDeleteStmt>() ->returning(); break; default: returning_clause = nullptr; } return returning_clause; } absl::Status QueryEngine::IsPartitionable(const Query& query, const QueryContext& context) const { ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options, MakeAnalyzerOptionsWithParameters( query.declared_params, GetTimeZone(function_catalog_.GetLatestSchema()))); analyzer_options.set_prune_unused_columns(true); Catalog catalog{context.schema, &function_catalog_, type_factory_, analyzer_options}; std::unique_ptr<const zetasql::AnalyzerOutput> analyzer_output; if (context.schema->dialect() == database_api::DatabaseDialect::POSTGRESQL) { ZETASQL_ASSIGN_OR_RETURN(analyzer_output, AnalyzePostgreSQL(query.sql, &catalog, analyzer_options, type_factory_, &function_catalog_)); } else { ZETASQL_ASSIGN_OR_RETURN(analyzer_output, Analyze(query.sql, &catalog, analyzer_options, type_factory_)); } QueryEngineOptions options; ZETASQL_ASSIGN_OR_RETURN(auto resolved_statement, ExtractValidatedResolvedStatementAndOptions( analyzer_output.get(), context, /*in_partition_query=*/true, &options)); if (options.disable_query_partitionability_check) { return absl::OkStatus(); } // Perform partitionability checks on the query PartitionabilityValidator part_validator{context.schema}; return resolved_statement->Accept(&part_validator); } absl::Status QueryEngine::IsValidPartitionedDML( const Query& query, const QueryContext& context) const { if (!IsDMLQuery(query.sql)) { return error::InvalidOperationUsingPartitionedDmlTransaction(); } ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options, MakeAnalyzerOptionsWithParameters( query.declared_params, GetTimeZone(function_catalog_.GetLatestSchema()))); analyzer_options.set_prune_unused_columns(true); Catalog catalog{context.schema, &function_catalog_, type_factory_, analyzer_options}; ZETASQL_ASSIGN_OR_RETURN( auto analyzer_output, Analyze(query.sql, &catalog, analyzer_options, type_factory_)); ZETASQL_ASSIGN_OR_RETURN(auto resolved_statement, ExtractValidatedResolvedStatementAndOptions( analyzer_output.get(), context, /*in_partition_query=*/true)); // Check that the DML statement is partitionable. PartitionedDMLValidator validator; return resolved_statement->Accept(&validator); } bool IsDMLQuery(const std::string& query) { zetasql::ResolvedNodeKind query_kind = zetasql::GetStatementKind(query); return IsDMLStmt(query_kind); } absl::StatusOr<ChangeStreamQueryValidator::ChangeStreamMetadata> QueryEngine::TryGetChangeStreamMetadata(const Query& query, const Schema* schema, bool in_read_write_txn) { const absl::Time start_time = absl::Now(); ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options, MakeAnalyzerOptionsWithParameters(query.declared_params, GetTimeZone(schema))); analyzer_options.set_prune_unused_columns(true); zetasql::TypeFactory type_factory; FunctionCatalog function_catalog( &type_factory, kCloudSpannerEmulatorFunctionCatalogName, schema); Catalog catalog{schema, &function_catalog, &type_factory, analyzer_options}; std::unique_ptr<const zetasql::AnalyzerOutput> analyzer_output; if (schema->dialect() == database_api::DatabaseDialect::POSTGRESQL) { ZETASQL_ASSIGN_OR_RETURN(analyzer_output, AnalyzePostgreSQL(query.sql, &catalog, analyzer_options, &type_factory, &function_catalog)); } else { ZETASQL_ASSIGN_OR_RETURN(analyzer_output, Analyze(query.sql, &catalog, analyzer_options, &type_factory)); } ZETASQL_ASSIGN_OR_RETURN(auto params, ExtractParameters(query, analyzer_output.get())); ZETASQL_ASSIGN_OR_RETURN( auto resolved_statement, ExtractValidatedResolvedStatementAndOptions( analyzer_output.get(), QueryContext{.schema = schema, .allow_read_write_only_functions = in_read_write_txn})); ChangeStreamQueryValidator validator{ schema, start_time, absl::flat_hash_map<std::string, zetasql::Value>(params.begin(), params.end())}; ZETASQL_ASSIGN_OR_RETURN(auto is_change_stream, validator.IsChangeStreamQuery(resolved_statement.get())); if (is_change_stream) { ZETASQL_RETURN_IF_ERROR(resolved_statement->Accept(&validator)); } return validator.change_stream_metadata(); } } // namespace backend } // namespace emulator } // namespace spanner } // namespace google