in frontend/handlers/queries.cc [368:540]
absl::Status ExecuteStreamingSql(
RequestContext* ctx, const spanner_api::ExecuteSqlRequest* request,
ServerStream<spanner_api::PartialResultSet>* stream) {
// Take shared ownerships of session and transaction so that they will keep
// valid throughout this function.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
GetSession(ctx, request->session()));
backend::ChangeStreamQueryValidator::ChangeStreamMetadata
change_stream_metadata;
// Get underlying transaction.
bool is_dml_query = backend::IsDMLQuery(request->sql());
ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForQuery(request->transaction(),
is_dml_query));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
session->FindOrInitTransaction(request->transaction()));
ZETASQL_RETURN_IF_ERROR(
ValidateDirectedReadsOption(request->directed_read_options(), txn));
// Wrap all operations on this transaction so they are atomic.
absl::Status status = txn->GuardedCall(
is_dml_query ? Transaction::OpType::kDml : Transaction::OpType::kSql,
[&]() -> absl::Status {
// Register DML request and check for status replay.
if (is_dml_query) {
const auto state = txn->LookupOrRegisterDmlRequest(
request->seqno(), HashRequest(request), request->sql());
if (state.has_value()) {
if (!state->status.ok()) {
return state->status;
}
if (!std::holds_alternative<spanner_api::ResultSet>(
state->outcome)) {
return error::ReplayRequestMismatch(request->seqno(),
request->sql());
}
spanner_api::PartialResultSet response;
spanner_api::ResultSet replay_result =
std::get<spanner_api::ResultSet>(state->outcome);
*response.mutable_stats() = replay_result.stats();
*response.mutable_metadata() = replay_result.metadata();
stream->Send(response);
return state->status;
}
// DML needs to explicitly check the transaction status since
// the DML sequence number replay should take priority over returning
// a previously encountered error status.
ZETASQL_RETURN_IF_ERROR(txn->Status());
}
// Cannot query after commit, rollback, or non-recoverable error.
if (txn->IsInvalid()) {
return error::CannotUseTransactionAfterConstraintError();
}
if (txn->IsCommitted() || txn->IsRolledback()) {
if (txn->IsPartitionedDml()) {
return error::CannotReusePartitionedDmlTransaction();
}
return error::CannotReadOrQueryAfterCommitOrRollback();
}
if (txn->IsReadOnly()) {
if (is_dml_query) {
return error::ReadOnlyTransactionDoesNotSupportDml("ReadOnly");
}
ZETASQL_ASSIGN_OR_RETURN(absl::Time read_timestamp, txn->GetReadTimestamp());
ZETASQL_RETURN_IF_ERROR(ValidateReadTimestampNotTooFarInFuture(
read_timestamp, ctx->env()->clock()->Now()));
}
// Convert and execute provided SQL statement.
ZETASQL_ASSIGN_OR_RETURN(const backend::Query query,
QueryFromProto(request->sql(), request->params(),
request->param_types(),
txn->query_engine()->type_factory(),
txn->schema()->proto_bundle()));
bool in_read_write_txn = txn->IsReadWrite() || txn->IsPartitionedDml();
ZETASQL_ASSIGN_OR_RETURN(change_stream_metadata,
backend::QueryEngine::TryGetChangeStreamMetadata(
query, txn->schema(), in_read_write_txn));
// if current query is a change stream query, return and exit current
// transaction lambda to avoid nested transaction call.
if (change_stream_metadata.is_change_stream_query) {
return absl::OkStatus();
}
auto maybe_result = txn->ExecuteSql(query, request->query_mode());
if (!maybe_result.ok()) {
absl::Status error = maybe_result.status();
if (txn->IsPartitionedDml()) {
// A Partitioned DML transaction will become invalidated on any
// error.
error.SetPayload(kConstraintError, absl::Cord(""));
}
return error;
}
backend::QueryResult& result = maybe_result.value();
std::vector<spanner_api::PartialResultSet> responses;
if (is_dml_query) {
responses.emplace_back();
if (result.rows == nullptr) {
// Set empty row type.
responses.back().mutable_metadata()->mutable_row_type();
} else {
// It contains DML THEN RETURN row results.
ZETASQL_ASSIGN_OR_RETURN(responses, RowCursorToPartialResultSetProtos(
result.rows.get(), /*limit=*/0));
}
if (txn->IsPartitionedDml()) {
responses.back().mutable_stats()->set_row_count_lower_bound(
result.modified_row_count);
} else {
responses.back().mutable_stats()->set_row_count_exact(
result.modified_row_count);
}
} else {
ZETASQL_ASSIGN_OR_RETURN(responses, RowCursorToPartialResultSetProtos(
result.rows.get(), /*limit=*/0));
}
if (!request->partition_token().empty()) {
ZETASQL_ASSIGN_OR_RETURN(
auto partition_token,
PartitionTokenFromString(request->partition_token()));
ZETASQL_RETURN_IF_ERROR(ValidatePartitionToken(partition_token, request));
if (partition_token.empty_query_partition()) {
// Clear all partial responses except the first one. Return only
// metadata in the first partial response.
responses.resize(1);
responses.front().clear_values();
responses.front().clear_chunked_value();
}
}
// Populate transaction metadata.
if (ShouldReturnTransaction(request->transaction())) {
ZETASQL_ASSIGN_OR_RETURN(
*responses.front().mutable_metadata()->mutable_transaction(),
txn->ToProto());
}
// Return query parameter types.
ZETASQL_RETURN_IF_ERROR(AddUndeclaredParametersFromQueryResult(
&result.parameter_types, responses.front().mutable_metadata()));
// Add basic stats for PROFILE mode. We do this to interoperate with
// REPL applications written for Cloud Spanner. The profile will not
// contain statistics for plan nodes.
if (request->query_mode() == spanner_api::ExecuteSqlRequest::PROFILE) {
AddQueryStatsFromQueryResult(
result, responses.front().mutable_stats()->mutable_query_stats());
}
// Send results back to client.
for (const auto& response : responses) {
stream->Send(response);
}
if (is_dml_query) {
spanner_api::ResultSet replay_result;
*replay_result.mutable_stats() = responses[0].stats();
*replay_result.mutable_metadata() = responses[0].metadata();
txn->SetDmlReplayOutcome(replay_result);
}
return absl::OkStatus();
});
if (change_stream_metadata.is_change_stream_query) {
ChangeStreamsHandler change_streams_handler{change_stream_metadata};
return change_streams_handler.ExecuteChangeStreamQuery(request, stream,
session);
}
return status;
}