in frontend/handlers/queries.cc [219:360]
absl::Status ExecuteSql(RequestContext* ctx,
const spanner_api::ExecuteSqlRequest* request,
spanner_api::ResultSet* response) {
// Take shared ownerships of session and transaction so that they will keep
// valid throughout this function.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
GetSession(ctx, request->session()));
// Get underlying transaction.
bool is_dml_query = backend::IsDMLQuery(request->sql());
ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForQuery(request->transaction(),
is_dml_query));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
session->FindOrInitTransaction(request->transaction()));
ZETASQL_RETURN_IF_ERROR(
ValidateDirectedReadsOption(request->directed_read_options(), txn));
// Wrap all operations on this transaction so they are atomic.
return txn->GuardedCall(
is_dml_query ? Transaction::OpType::kDml : Transaction::OpType::kSql,
[&]() -> absl::Status {
// Register DML request and check for status replay.
if (is_dml_query) {
const auto state = txn->LookupOrRegisterDmlRequest(
request->seqno(), HashRequest(request), request->sql());
if (state.has_value()) {
if (!state->status.ok()) {
return state->status;
}
if (!std::holds_alternative<spanner_api::ResultSet>(
state->outcome)) {
return error::ReplayRequestMismatch(request->seqno(),
request->sql());
}
*response = std::get<spanner_api::ResultSet>(state->outcome);
return state->status;
}
// DML needs to explicitly check the transaction status since
// the DML sequence number replay should take priority over returning
// a previously encountered error status.
ZETASQL_RETURN_IF_ERROR(txn->Status());
}
// Cannot query after commit, rollback, or non-recoverable error.
if (txn->IsInvalid()) {
return error::CannotUseTransactionAfterConstraintError();
}
if (txn->IsCommitted() || txn->IsRolledback()) {
if (txn->IsPartitionedDml()) {
return error::CannotReusePartitionedDmlTransaction();
}
return error::CannotReadOrQueryAfterCommitOrRollback();
}
if (txn->IsReadOnly()) {
if (is_dml_query) {
return error::ReadOnlyTransactionDoesNotSupportDml("ReadOnly");
}
ZETASQL_ASSIGN_OR_RETURN(absl::Time read_timestamp, txn->GetReadTimestamp());
ZETASQL_RETURN_IF_ERROR(ValidateReadTimestampNotTooFarInFuture(
read_timestamp, ctx->env()->clock()->Now()));
}
// Convert and execute provided SQL statement.
ZETASQL_ASSIGN_OR_RETURN(const backend::Query query,
QueryFromProto(request->sql(), request->params(),
request->param_types(),
txn->query_engine()->type_factory(),
txn->schema()->proto_bundle()));
auto maybe_result = txn->ExecuteSql(query, request->query_mode());
if (!maybe_result.ok()) {
absl::Status error = maybe_result.status();
if (txn->IsPartitionedDml()) {
// A Partitioned DML transaction will become invalidated on any
// error.
error.SetPayload(kConstraintError, absl::Cord(""));
}
return error;
}
backend::QueryResult& result = maybe_result.value();
// Populate transaction metadata.
if (ShouldReturnTransaction(request->transaction())) {
ZETASQL_ASSIGN_OR_RETURN(*response->mutable_metadata()->mutable_transaction(),
txn->ToProto());
}
// Return query parameter types.
ZETASQL_RETURN_IF_ERROR(AddUndeclaredParametersFromQueryResult(
&result.parameter_types, response->mutable_metadata()));
if (is_dml_query) {
if (txn->IsPartitionedDml()) {
response->mutable_stats()->set_row_count_lower_bound(
result.modified_row_count);
} else {
response->mutable_stats()->set_row_count_exact(
result.modified_row_count);
}
if (result.rows == nullptr) {
// Set empty row type.
response->mutable_metadata()->mutable_row_type();
} else {
// It contains DML THEN RETURN row results.
ZETASQL_RETURN_IF_ERROR(RowCursorToResultSetProto(result.rows.get(),
/*limit=*/0, response));
}
} else {
ZETASQL_RETURN_IF_ERROR(RowCursorToResultSetProto(result.rows.get(),
/*limit=*/0, response));
}
if (!request->partition_token().empty()) {
ZETASQL_ASSIGN_OR_RETURN(
auto partition_token,
PartitionTokenFromString(request->partition_token()));
ZETASQL_RETURN_IF_ERROR(ValidatePartitionToken(partition_token, request));
if (partition_token.empty_query_partition()) {
response->clear_rows();
}
}
// Add basic stats for PROFILE mode. We do this to interoperate with
// REPL applications written for Cloud Spanner. The profile will not
// contain statistics for plan nodes.
if (request->query_mode() == spanner_api::ExecuteSqlRequest::PROFILE) {
AddQueryStatsFromQueryResult(
result, response->mutable_stats()->mutable_query_stats());
}
// Add an empty query plan if the user requested either PLAN or PROFILE
// query mode.
if (request->query_mode() == spanner_api::ExecuteSqlRequest::PLAN ||
request->query_mode() == spanner_api::ExecuteSqlRequest::PROFILE) {
AddEmptyQueryPlan(response->mutable_stats());
}
if (is_dml_query) {
txn->SetDmlReplayOutcome(*response);
}
return absl::OkStatus();
});
}