frontend/handlers/partitions.cc (169 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include <memory> #include <string> #include "google/protobuf/struct.pb.h" #include "google/spanner/v1/keys.pb.h" #include "google/spanner/v1/spanner.pb.h" #include "google/spanner/v1/transaction.pb.h" #include "google/spanner/v1/type.pb.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "backend/query/query_engine.h" #include "common/config.h" #include "common/errors.h" #include "frontend/converters/partition.h" #include "frontend/converters/query.h" #include "frontend/converters/reads.h" #include "frontend/entities/session.h" #include "frontend/proto/partition_token.pb.h" #include "frontend/server/handler.h" #include "zetasql/base/status_macros.h" namespace google { namespace spanner { namespace emulator { namespace frontend { namespace { absl::Status ValidateTransactionSelectorForPartitionRead( const spanner_api::TransactionSelector& selector) { // PartitionRead and PartitionQuery only support read only snapshot // transactions. Read/write and single use transactions are not supported. switch (selector.selector_case()) { case spanner_api::TransactionSelector::SelectorCase::kBegin: { if (!selector.begin().has_read_only()) { return error::PartitionReadNeedsReadOnlyTxn(); } return absl::OkStatus(); } case spanner_api::TransactionSelector::SelectorCase::kId: { return absl::OkStatus(); } case spanner_api::TransactionSelector::SelectorCase::kSingleUse: return error::PartitionReadDoesNotSupportSingleUseTransaction(); case spanner_api::TransactionSelector::SELECTOR_NOT_SET: return error::MissingRequiredFieldError("TransactionSelector.selector"); } } absl::Status ValidatePartitionOptions( const spanner_api::PartitionOptions& partition_options) { if (partition_options.partition_size_bytes() < 0) { return error::InvalidBytesPerBatch("partition_options"); } if (partition_options.max_partitions() < 0) { return error::InvalidMaxPartitionCount("partition_options"); } return absl::OkStatus(); } // Create a partition token for the given partition read request and partitioned // key set. absl::StatusOr<PartitionToken> CreatePartitionTokenForRead( const google::spanner::v1::PartitionReadRequest& request, const backend::TransactionID& txn_id, const google::spanner::v1::KeySet& partitioned_key_set) { PartitionToken partition_token; *partition_token.mutable_session() = request.session(); *partition_token.mutable_transaction_id() = std::to_string(txn_id); auto read_params = partition_token.mutable_read_params(); *read_params->mutable_table() = request.table(); *read_params->mutable_index() = request.index(); *read_params->mutable_key_set() = request.key_set(); *read_params->mutable_columns() = request.columns(); *partition_token.mutable_partitioned_key_set() = partitioned_key_set; return partition_token; } // Create a partition token for the given partition query request. absl::StatusOr<PartitionToken> CreatePartitionTokenForQuery( const google::spanner::v1::PartitionQueryRequest& request, const backend::TransactionID& txn_id, bool empty_partition) { if (request.sql().empty()) { return error::MissingRequiredFieldError("sql"); } PartitionToken partition_token; *partition_token.mutable_session() = request.session(); *partition_token.mutable_transaction_id() = std::to_string(txn_id); auto query_params = partition_token.mutable_query_params(); *query_params->mutable_sql() = request.sql(); *query_params->mutable_params() = request.params(); *query_params->mutable_param_types() = request.param_types(); partition_token.set_empty_query_partition(empty_partition); return partition_token; } } // namespace // Creates a set of partition tokens for executing parallel read operations. absl::Status PartitionRead(RequestContext* ctx, const spanner_api::PartitionReadRequest* request, spanner_api::PartitionResponse* response) { // Take shared ownerships of session and transaction so that they will keep // valid throughout this function. ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session, GetSession(ctx, request->session())); // Get underlying transaction. ZETASQL_RETURN_IF_ERROR( ValidateTransactionSelectorForPartitionRead(request->transaction())); ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn, session->FindOrInitTransaction(request->transaction())); if (!txn->IsReadOnly()) { return error::PartitionReadNeedsReadOnlyTxn(); } if (request->has_partition_options()) { ZETASQL_RETURN_IF_ERROR(ValidatePartitionOptions(request->partition_options())); } if (ShouldReturnTransaction(request->transaction())) { ZETASQL_ASSIGN_OR_RETURN(*response->mutable_transaction(), txn->ToProto()); } // Add two partitions to result set, with first partition being empty. ZETASQL_ASSIGN_OR_RETURN( auto empty_partition_token, CreatePartitionTokenForRead(*request, txn->id(), spanner_api::KeySet())); spanner_api::Partition empty_partition; ZETASQL_ASSIGN_OR_RETURN(*empty_partition.mutable_partition_token(), PartitionTokenToString(empty_partition_token)); // Second partition contains full result set for requested key_set. ZETASQL_ASSIGN_OR_RETURN( auto full_partition_token, CreatePartitionTokenForRead(*request, txn->id(), request->key_set())); spanner_api::Partition full_partition; ZETASQL_ASSIGN_OR_RETURN(*full_partition.mutable_partition_token(), PartitionTokenToString(full_partition_token)); *response->mutable_partitions()->Add() = empty_partition; *response->mutable_partitions()->Add() = full_partition; return absl::OkStatus(); } REGISTER_GRPC_HANDLER(Spanner, PartitionRead); // Creates a set of partition tokens for executing parallel query operations. absl::Status PartitionQuery(RequestContext* ctx, const spanner_api::PartitionQueryRequest* request, spanner_api::PartitionResponse* response) { // Take shared ownerships of session and transaction so that they will keep // valid throughout this function. ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session, GetSession(ctx, request->session())); // Get underlying transaction. ZETASQL_RETURN_IF_ERROR( ValidateTransactionSelectorForPartitionRead(request->transaction())); ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn, session->FindOrInitTransaction(request->transaction())); if (!txn->IsReadOnly()) { return error::PartitionReadNeedsReadOnlyTxn(); } if (request->has_partition_options()) { ZETASQL_RETURN_IF_ERROR(ValidatePartitionOptions(request->partition_options())); } if (ShouldReturnTransaction(request->transaction())) { ZETASQL_ASSIGN_OR_RETURN(*response->mutable_transaction(), txn->ToProto()); } // check query is partitionable. ZETASQL_ASSIGN_OR_RETURN( backend::Query query, QueryFromProto(request->sql(), request->params(), request->param_types(), txn->query_engine()->type_factory() , txn->schema()->proto_bundle() )); ZETASQL_RETURN_IF_ERROR(txn->query_engine()->IsPartitionable( query, backend::QueryContext{ .schema = txn->schema(), .reader = nullptr, .writer = nullptr})); // Add two partitions to result set, with first partition being empty. ZETASQL_ASSIGN_OR_RETURN(auto empty_partition_token, CreatePartitionTokenForQuery(*request, txn->id(), /*empty_partition =*/true)); spanner_api::Partition empty_partition; ZETASQL_ASSIGN_OR_RETURN(*empty_partition.mutable_partition_token(), PartitionTokenToString(empty_partition_token)); // Second partition contains full result set for requested query. ZETASQL_ASSIGN_OR_RETURN(auto full_partition_token, CreatePartitionTokenForQuery(*request, txn->id(), /*empty_partition =*/false)); spanner_api::Partition full_partition; ZETASQL_ASSIGN_OR_RETURN(*full_partition.mutable_partition_token(), PartitionTokenToString(full_partition_token)); *response->mutable_partitions()->Add() = empty_partition; *response->mutable_partitions()->Add() = full_partition; return absl::OkStatus(); } REGISTER_GRPC_HANDLER(Spanner, PartitionQuery); } // namespace frontend } // namespace emulator } // namespace spanner } // namespace google