frontend/handlers/queries.cc (501 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <variant>
#include <vector>
#include "google/protobuf/struct.pb.h"
#include "google/spanner/v1/query_plan.pb.h"
#include "google/spanner/v1/result_set.pb.h"
#include "google/spanner/v1/spanner.pb.h"
#include "google/spanner/v1/transaction.pb.h"
#include "zetasql/public/analyzer_options.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/cord.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "backend/access/read.h"
#include "backend/query/change_stream/change_stream_query_validator.h"
#include "backend/query/query_engine.h"
#include "common/constants.h"
#include "common/errors.h"
#include "frontend/common/protos.h"
#include "frontend/common/validations.h"
#include "frontend/converters/partition.h"
#include "frontend/converters/query.h"
#include "frontend/converters/reads.h"
#include "frontend/converters/types.h"
#include "frontend/converters/values.h"
#include "frontend/entities/session.h"
#include "frontend/entities/transaction.h"
#include "frontend/handlers/change_streams.h"
#include "frontend/proto/partition_token.pb.h"
#include "frontend/server/handler.h"
#include "frontend/server/request_context.h"
#include "farmhash.h"
#include "absl/status/status.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace frontend {
namespace spanner_api = ::google::spanner::v1;
namespace {
absl::Duration kMaxFutureReadDuration = absl::Hours(1);
absl::Status ValidateReadTimestampNotTooFarInFuture(absl::Time read_timestamp,
absl::Time now) {
if (read_timestamp - now > kMaxFutureReadDuration) {
return error::ReadTimestampTooFarInFuture(read_timestamp);
}
return absl::OkStatus();
}
absl::Status ValidateTransactionSelectorForQuery(
const spanner_api::TransactionSelector& selector, bool is_dml) {
if (selector.selector_case() ==
spanner_api::TransactionSelector::SelectorCase::kSingleUse &&
selector.single_use().mode_case() != v1::TransactionOptions::kReadOnly) {
return error::InvalidModeForReadOnlySingleUseTransaction();
}
if (is_dml) {
if (selector.begin().mode_case() == v1::TransactionOptions::kReadOnly) {
// ReadWrite and PartitionedDML transactions are currently allowed.
return error::ReadOnlyTransactionDoesNotSupportDml("ReadOnly");
}
if (selector.selector_case() ==
spanner_api::TransactionSelector::SelectorCase::kSingleUse) {
return error::DmlDoesNotSupportSingleUseTransaction();
}
}
return absl::OkStatus();
}
absl::Status ValidatePartitionToken(
const PartitionToken& partition_token,
const spanner_api::ExecuteSqlRequest* request) {
if (request->query_mode() != v1::ExecuteSqlRequest::NORMAL) {
return error::InvalidPartitionedQueryMode();
}
if (partition_token.session() != request->session()) {
return error::ReadFromDifferentSession();
}
if (request->transaction().selector_case() != v1::TransactionSelector::kId ||
partition_token.transaction_id() != request->transaction().id()) {
return error::ReadFromDifferentTransaction();
}
if (!partition_token.has_query_params()) {
return error::ReadFromDifferentParameters();
}
auto query_params = partition_token.query_params();
if (query_params.sql() != request->sql()) {
return error::ReadFromDifferentParameters();
}
if (query_params.params().fields_size() != request->params().fields_size()) {
return error::ReadFromDifferentParameters();
}
for (const auto& field : query_params.params().fields()) {
if (!request->params().fields().contains(field.first) ||
field.second.SerializeAsString() !=
request->params().fields().at(field.first).SerializeAsString()) {
return error::ReadFromDifferentParameters();
}
}
if (query_params.param_types_size() != request->param_types_size()) {
return error::ReadFromDifferentParameters();
}
for (const auto& param_type : query_params.param_types()) {
if (!request->param_types().contains(param_type.first) ||
param_type.second.GetTypeName() !=
request->param_types().at(param_type.first).GetTypeName()) {
return error::ReadFromDifferentParameters();
}
}
return absl::OkStatus();
}
void AddQueryStatsFromQueryResult(const backend::QueryResult& result,
google::protobuf::Struct* stats) {
(*stats->mutable_fields())["rows_returned"].set_string_value(
absl::StrCat(result.num_output_rows));
(*stats->mutable_fields())["elapsed_time"].set_string_value(
absl::FormatDuration(result.elapsed_time));
}
void AddEmptyQueryPlan(v1::ResultSetStats* stats) {
auto node = stats->mutable_query_plan()->add_plan_nodes();
auto display_name = std::string("No query plan");
node->mutable_display_name()->assign(display_name);
node->set_kind(v1::PlanNode::KIND_UNSPECIFIED);
}
absl::Status AddUndeclaredParametersFromQueryResult(
zetasql::QueryParametersMap* cursor, v1::ResultSetMetadata* metadata_pb) {
for (auto const& param : *cursor) {
auto* field_pb = metadata_pb->mutable_undeclared_parameters()->add_fields();
field_pb->set_name(param.first);
ZETASQL_RETURN_IF_ERROR(TypeToProto(param.second, field_pb->mutable_type()))
<< " when converting param " << param.first << " of type "
<< param.second << " in row cursor";
}
return absl::OkStatus();
}
absl::StatusOr<backend::QueryResult> ExecuteQuery(
const spanner_api::ExecuteBatchDmlRequest_Statement& statement,
std::shared_ptr<Transaction> txn) {
ZETASQL_ASSIGN_OR_RETURN(const backend::Query query,
QueryFromProto(statement.sql(), statement.params(),
statement.param_types(),
txn->query_engine()->type_factory(),
txn->schema()->proto_bundle()));
return txn->ExecuteSql(query);
}
template <typename Request>
int64_t SerializeAndHashRequest(const Request& request) {
std::string serialized_request;
{
// Serialize the request proto deterministically.
// Message::SerializeToString() is not guaranteed to deterministically
// generate the same string for a message that contains map fields.
// We create the output stream in an inner scope so that it gets flushed
// in the destructor before computing the fingerprint.
google::protobuf::io::StringOutputStream stream(&serialized_request);
google::protobuf::io::CodedOutputStream output(&stream);
output.SetSerializationDeterministic(true);
request.SerializeToCodedStream(&output);
}
return farmhash::Fingerprint64(serialized_request);
}
int64_t HashRequest(const spanner_api::ExecuteSqlRequest* request) {
spanner_api::ExecuteSqlRequest copy = *request;
// Clearing resume token and sequence number so that the hash is based
// entirely on the sql statement.
copy.clear_resume_token();
copy.set_seqno(0);
return SerializeAndHashRequest(copy);
}
int64_t HashRequest(const spanner_api::ExecuteBatchDmlRequest* request) {
spanner_api::ExecuteBatchDmlRequest copy = *request;
// Clearing sequence number so that the hash is based entirely on the sql
// statement.
copy.set_seqno(0);
return SerializeAndHashRequest(copy);
}
} // namespace
// Executes a SQL statement, returning all results in a single reply.
absl::Status ExecuteSql(RequestContext* ctx,
const spanner_api::ExecuteSqlRequest* request,
spanner_api::ResultSet* response) {
// Take shared ownerships of session and transaction so that they will keep
// valid throughout this function.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
GetSession(ctx, request->session()));
// Get underlying transaction.
bool is_dml_query = backend::IsDMLQuery(request->sql());
ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForQuery(request->transaction(),
is_dml_query));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
session->FindOrInitTransaction(request->transaction()));
ZETASQL_RETURN_IF_ERROR(
ValidateDirectedReadsOption(request->directed_read_options(), txn));
// Wrap all operations on this transaction so they are atomic.
return txn->GuardedCall(
is_dml_query ? Transaction::OpType::kDml : Transaction::OpType::kSql,
[&]() -> absl::Status {
// Register DML request and check for status replay.
if (is_dml_query) {
const auto state = txn->LookupOrRegisterDmlRequest(
request->seqno(), HashRequest(request), request->sql());
if (state.has_value()) {
if (!state->status.ok()) {
return state->status;
}
if (!std::holds_alternative<spanner_api::ResultSet>(
state->outcome)) {
return error::ReplayRequestMismatch(request->seqno(),
request->sql());
}
*response = std::get<spanner_api::ResultSet>(state->outcome);
return state->status;
}
// DML needs to explicitly check the transaction status since
// the DML sequence number replay should take priority over returning
// a previously encountered error status.
ZETASQL_RETURN_IF_ERROR(txn->Status());
}
// Cannot query after commit, rollback, or non-recoverable error.
if (txn->IsInvalid()) {
return error::CannotUseTransactionAfterConstraintError();
}
if (txn->IsCommitted() || txn->IsRolledback()) {
if (txn->IsPartitionedDml()) {
return error::CannotReusePartitionedDmlTransaction();
}
return error::CannotReadOrQueryAfterCommitOrRollback();
}
if (txn->IsReadOnly()) {
if (is_dml_query) {
return error::ReadOnlyTransactionDoesNotSupportDml("ReadOnly");
}
ZETASQL_ASSIGN_OR_RETURN(absl::Time read_timestamp, txn->GetReadTimestamp());
ZETASQL_RETURN_IF_ERROR(ValidateReadTimestampNotTooFarInFuture(
read_timestamp, ctx->env()->clock()->Now()));
}
// Convert and execute provided SQL statement.
ZETASQL_ASSIGN_OR_RETURN(const backend::Query query,
QueryFromProto(request->sql(), request->params(),
request->param_types(),
txn->query_engine()->type_factory(),
txn->schema()->proto_bundle()));
auto maybe_result = txn->ExecuteSql(query, request->query_mode());
if (!maybe_result.ok()) {
absl::Status error = maybe_result.status();
if (txn->IsPartitionedDml()) {
// A Partitioned DML transaction will become invalidated on any
// error.
error.SetPayload(kConstraintError, absl::Cord(""));
}
return error;
}
backend::QueryResult& result = maybe_result.value();
// Populate transaction metadata.
if (ShouldReturnTransaction(request->transaction())) {
ZETASQL_ASSIGN_OR_RETURN(*response->mutable_metadata()->mutable_transaction(),
txn->ToProto());
}
// Return query parameter types.
ZETASQL_RETURN_IF_ERROR(AddUndeclaredParametersFromQueryResult(
&result.parameter_types, response->mutable_metadata()));
if (is_dml_query) {
if (txn->IsPartitionedDml()) {
response->mutable_stats()->set_row_count_lower_bound(
result.modified_row_count);
} else {
response->mutable_stats()->set_row_count_exact(
result.modified_row_count);
}
if (result.rows == nullptr) {
// Set empty row type.
response->mutable_metadata()->mutable_row_type();
} else {
// It contains DML THEN RETURN row results.
ZETASQL_RETURN_IF_ERROR(RowCursorToResultSetProto(result.rows.get(),
/*limit=*/0, response));
}
} else {
ZETASQL_RETURN_IF_ERROR(RowCursorToResultSetProto(result.rows.get(),
/*limit=*/0, response));
}
if (!request->partition_token().empty()) {
ZETASQL_ASSIGN_OR_RETURN(
auto partition_token,
PartitionTokenFromString(request->partition_token()));
ZETASQL_RETURN_IF_ERROR(ValidatePartitionToken(partition_token, request));
if (partition_token.empty_query_partition()) {
response->clear_rows();
}
}
// Add basic stats for PROFILE mode. We do this to interoperate with
// REPL applications written for Cloud Spanner. The profile will not
// contain statistics for plan nodes.
if (request->query_mode() == spanner_api::ExecuteSqlRequest::PROFILE) {
AddQueryStatsFromQueryResult(
result, response->mutable_stats()->mutable_query_stats());
}
// Add an empty query plan if the user requested either PLAN or PROFILE
// query mode.
if (request->query_mode() == spanner_api::ExecuteSqlRequest::PLAN ||
request->query_mode() == spanner_api::ExecuteSqlRequest::PROFILE) {
AddEmptyQueryPlan(response->mutable_stats());
}
if (is_dml_query) {
txn->SetDmlReplayOutcome(*response);
}
return absl::OkStatus();
});
}
REGISTER_GRPC_HANDLER(Spanner, ExecuteSql);
// Executes a SQL statement, returning all results as a stream.
//
// resume_tokens is not supported in the emulator. This implementation does not
// limit the size of the response and therefore, chunked_value will always be
// false.
absl::Status ExecuteStreamingSql(
RequestContext* ctx, const spanner_api::ExecuteSqlRequest* request,
ServerStream<spanner_api::PartialResultSet>* stream) {
// Take shared ownerships of session and transaction so that they will keep
// valid throughout this function.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
GetSession(ctx, request->session()));
backend::ChangeStreamQueryValidator::ChangeStreamMetadata
change_stream_metadata;
// Get underlying transaction.
bool is_dml_query = backend::IsDMLQuery(request->sql());
ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForQuery(request->transaction(),
is_dml_query));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
session->FindOrInitTransaction(request->transaction()));
ZETASQL_RETURN_IF_ERROR(
ValidateDirectedReadsOption(request->directed_read_options(), txn));
// Wrap all operations on this transaction so they are atomic.
absl::Status status = txn->GuardedCall(
is_dml_query ? Transaction::OpType::kDml : Transaction::OpType::kSql,
[&]() -> absl::Status {
// Register DML request and check for status replay.
if (is_dml_query) {
const auto state = txn->LookupOrRegisterDmlRequest(
request->seqno(), HashRequest(request), request->sql());
if (state.has_value()) {
if (!state->status.ok()) {
return state->status;
}
if (!std::holds_alternative<spanner_api::ResultSet>(
state->outcome)) {
return error::ReplayRequestMismatch(request->seqno(),
request->sql());
}
spanner_api::PartialResultSet response;
spanner_api::ResultSet replay_result =
std::get<spanner_api::ResultSet>(state->outcome);
*response.mutable_stats() = replay_result.stats();
*response.mutable_metadata() = replay_result.metadata();
stream->Send(response);
return state->status;
}
// DML needs to explicitly check the transaction status since
// the DML sequence number replay should take priority over returning
// a previously encountered error status.
ZETASQL_RETURN_IF_ERROR(txn->Status());
}
// Cannot query after commit, rollback, or non-recoverable error.
if (txn->IsInvalid()) {
return error::CannotUseTransactionAfterConstraintError();
}
if (txn->IsCommitted() || txn->IsRolledback()) {
if (txn->IsPartitionedDml()) {
return error::CannotReusePartitionedDmlTransaction();
}
return error::CannotReadOrQueryAfterCommitOrRollback();
}
if (txn->IsReadOnly()) {
if (is_dml_query) {
return error::ReadOnlyTransactionDoesNotSupportDml("ReadOnly");
}
ZETASQL_ASSIGN_OR_RETURN(absl::Time read_timestamp, txn->GetReadTimestamp());
ZETASQL_RETURN_IF_ERROR(ValidateReadTimestampNotTooFarInFuture(
read_timestamp, ctx->env()->clock()->Now()));
}
// Convert and execute provided SQL statement.
ZETASQL_ASSIGN_OR_RETURN(const backend::Query query,
QueryFromProto(request->sql(), request->params(),
request->param_types(),
txn->query_engine()->type_factory(),
txn->schema()->proto_bundle()));
bool in_read_write_txn = txn->IsReadWrite() || txn->IsPartitionedDml();
ZETASQL_ASSIGN_OR_RETURN(change_stream_metadata,
backend::QueryEngine::TryGetChangeStreamMetadata(
query, txn->schema(), in_read_write_txn));
// if current query is a change stream query, return and exit current
// transaction lambda to avoid nested transaction call.
if (change_stream_metadata.is_change_stream_query) {
return absl::OkStatus();
}
auto maybe_result = txn->ExecuteSql(query, request->query_mode());
if (!maybe_result.ok()) {
absl::Status error = maybe_result.status();
if (txn->IsPartitionedDml()) {
// A Partitioned DML transaction will become invalidated on any
// error.
error.SetPayload(kConstraintError, absl::Cord(""));
}
return error;
}
backend::QueryResult& result = maybe_result.value();
std::vector<spanner_api::PartialResultSet> responses;
if (is_dml_query) {
responses.emplace_back();
if (result.rows == nullptr) {
// Set empty row type.
responses.back().mutable_metadata()->mutable_row_type();
} else {
// It contains DML THEN RETURN row results.
ZETASQL_ASSIGN_OR_RETURN(responses, RowCursorToPartialResultSetProtos(
result.rows.get(), /*limit=*/0));
}
if (txn->IsPartitionedDml()) {
responses.back().mutable_stats()->set_row_count_lower_bound(
result.modified_row_count);
} else {
responses.back().mutable_stats()->set_row_count_exact(
result.modified_row_count);
}
} else {
ZETASQL_ASSIGN_OR_RETURN(responses, RowCursorToPartialResultSetProtos(
result.rows.get(), /*limit=*/0));
}
if (!request->partition_token().empty()) {
ZETASQL_ASSIGN_OR_RETURN(
auto partition_token,
PartitionTokenFromString(request->partition_token()));
ZETASQL_RETURN_IF_ERROR(ValidatePartitionToken(partition_token, request));
if (partition_token.empty_query_partition()) {
// Clear all partial responses except the first one. Return only
// metadata in the first partial response.
responses.resize(1);
responses.front().clear_values();
responses.front().clear_chunked_value();
}
}
// Populate transaction metadata.
if (ShouldReturnTransaction(request->transaction())) {
ZETASQL_ASSIGN_OR_RETURN(
*responses.front().mutable_metadata()->mutable_transaction(),
txn->ToProto());
}
// Return query parameter types.
ZETASQL_RETURN_IF_ERROR(AddUndeclaredParametersFromQueryResult(
&result.parameter_types, responses.front().mutable_metadata()));
// Add basic stats for PROFILE mode. We do this to interoperate with
// REPL applications written for Cloud Spanner. The profile will not
// contain statistics for plan nodes.
if (request->query_mode() == spanner_api::ExecuteSqlRequest::PROFILE) {
AddQueryStatsFromQueryResult(
result, responses.front().mutable_stats()->mutable_query_stats());
}
// Send results back to client.
for (const auto& response : responses) {
stream->Send(response);
}
if (is_dml_query) {
spanner_api::ResultSet replay_result;
*replay_result.mutable_stats() = responses[0].stats();
*replay_result.mutable_metadata() = responses[0].metadata();
txn->SetDmlReplayOutcome(replay_result);
}
return absl::OkStatus();
});
if (change_stream_metadata.is_change_stream_query) {
ChangeStreamsHandler change_streams_handler{change_stream_metadata};
return change_streams_handler.ExecuteChangeStreamQuery(request, stream,
session);
}
return status;
}
REGISTER_GRPC_HANDLER(Spanner, ExecuteStreamingSql);
// Executes a batch of DML statements.
absl::Status ExecuteBatchDml(RequestContext* ctx,
const spanner_api::ExecuteBatchDmlRequest* request,
spanner_api::ExecuteBatchDmlResponse* response) {
// Verify the request has DML statement(s).
if (request->statements().empty()) {
return error::InvalidBatchDmlRequest();
}
// Take shared ownerships of session and transaction so that they will keep
// valid throughout this function.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
GetSession(ctx, request->session()));
// Get underlying transaction.
ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForQuery(request->transaction(),
/*is_dml=*/true));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
session->FindOrInitTransaction(request->transaction()));
if (txn->IsPartitionedDml()) {
return error::BatchDmlOnlySupportsReadWriteTransaction();
}
// Set default response status to OK. Any error will override this.
*response->mutable_status() = StatusToProto(absl::OkStatus());
// Wrap all operations on this transaction so they are atomic.
return txn->GuardedCall(Transaction::OpType::kDml, [&]() -> absl::Status {
// Register DML request and check for status replay.
const auto state = txn->LookupOrRegisterDmlRequest(
request->seqno(), HashRequest(request), request->statements(0).sql());
if (state.has_value()) {
if (!state->status.ok() &&
txn->DMLErrorType() ==
Transaction::DMLErrorHandlingMode::kDmlRegistrationError) {
return state->status;
}
if (!std::holds_alternative<spanner_api::ExecuteBatchDmlResponse>(
state->outcome)) {
return error::ReplayRequestMismatch(request->seqno(),
request->statements(0).sql());
}
*response =
std::get<spanner_api::ExecuteBatchDmlResponse>(state->outcome);
// BatchDml always returns OK status with the error being populated in the
// response.
return absl::OkStatus();
}
// DML needs to explicitly check the transaction status since
// the DML sequence number replay should take priority over returning
// a previously encountered error status.
ZETASQL_RETURN_IF_ERROR(txn->Status());
// Cannot query after commit, rollback, or non-recoverable error.
if (txn->IsInvalid()) {
return error::CannotUseTransactionAfterConstraintError();
}
if (txn->IsCommitted() || txn->IsRolledback()) {
return error::CannotReadOrQueryAfterCommitOrRollback();
}
for (int index = 0; index < request->statements_size(); ++index) {
const auto& statement = request->statements(index);
if (!backend::IsDMLQuery(statement.sql())) {
absl::Status error = error::ExecuteBatchDmlOnlySupportsDmlStatements(
index, statement.sql());
*response->mutable_status() = StatusToProto(error);
txn->SetDmlReplayOutcome(*response);
return absl::OkStatus();
}
const auto maybe_result = ExecuteQuery(statement, txn);
if (!maybe_result.ok() &&
maybe_result.status().code() != absl::StatusCode::kAborted) {
absl::Status error = maybe_result.status();
*response->mutable_status() = StatusToProto(error);
txn->SetDmlReplayOutcome(*response);
txn->MaybeInvalidate(error);
return absl::OkStatus();
} else if (maybe_result.status().code() == absl::StatusCode::kAborted) {
return maybe_result.status();
}
const auto& result = maybe_result.value();
spanner_api::ResultSet* result_set = response->add_result_sets();
result_set->mutable_stats()->set_row_count_exact(
result.modified_row_count);
// Only populate metadata for first result set.
if (index == 0) {
result_set->mutable_metadata()->mutable_row_type();
if (ShouldReturnTransaction(request->transaction())) {
ZETASQL_ASSIGN_OR_RETURN(
*result_set->mutable_metadata()->mutable_transaction(),
txn->ToProto());
}
}
}
// Set the replay outcome.
txn->SetDmlReplayOutcome(*response);
return absl::OkStatus();
});
}
REGISTER_GRPC_HANDLER(Spanner, ExecuteBatchDml);
} // namespace frontend
} // namespace emulator
} // namespace spanner
} // namespace google