frontend/handlers/reads.cc (116 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "frontend/converters/reads.h"
#include <memory>
#include "google/spanner/v1/result_set.pb.h"
#include "google/spanner/v1/spanner.pb.h"
#include "google/spanner/v1/transaction.pb.h"
#include "backend/common/ids.h"
#include "common/errors.h"
#include "frontend/common/protos.h"
#include "frontend/common/validations.h"
#include "frontend/entities/session.h"
#include "frontend/entities/transaction.h"
#include "frontend/server/handler.h"
#include "absl/status/status.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace frontend {
namespace spanner_api = ::google::spanner::v1;
namespace {
absl::Duration kMaxFutureReadDuration = absl::Hours(1);
absl::Status ValidateReadTimestampNotTooFarInFuture(absl::Time read_timestamp,
absl::Time now) {
if (read_timestamp - now > kMaxFutureReadDuration) {
return error::ReadTimestampTooFarInFuture(read_timestamp);
}
return absl::OkStatus();
}
absl::Status ValidateTransactionSelectorForRead(
const spanner_api::TransactionSelector& selector) {
if (selector.selector_case() ==
spanner_api::TransactionSelector::SelectorCase::kSingleUse &&
selector.single_use().mode_case() != v1::TransactionOptions::kReadOnly) {
return error::InvalidModeForReadOnlySingleUseTransaction();
}
return absl::OkStatus();
}
} // namespace
// Reads rows from the database, returning all results in a single reply.
absl::Status Read(RequestContext* ctx, const spanner_api::ReadRequest* request,
spanner_api::ResultSet* response) {
// Get session information.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
GetSession(ctx, request->session()));
// Get underlying transaction.
ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForRead(request->transaction()));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
session->FindOrInitTransaction(request->transaction()));
ZETASQL_RETURN_IF_ERROR(
ValidateDirectedReadsOption(request->directed_read_options(), txn));
// Wrap all operations on this transaction so they are atomic .
return txn->GuardedCall(Transaction::OpType::kRead, [&]() -> absl::Status {
// Cannot read after commit, rollback, or non-recoverable error.
if (txn->IsInvalid()) {
return error::CannotUseTransactionAfterConstraintError();
}
if (txn->IsCommitted() || txn->IsRolledback()) {
return error::CannotReadOrQueryAfterCommitOrRollback();
}
if (txn->IsReadOnly()) {
ZETASQL_ASSIGN_OR_RETURN(absl::Time read_timestamp, txn->GetReadTimestamp());
ZETASQL_RETURN_IF_ERROR(ValidateReadTimestampNotTooFarInFuture(
read_timestamp, ctx->env()->clock()->Now()));
}
// Parse read request.
backend::ReadArg read_arg;
ZETASQL_RETURN_IF_ERROR(ReadArgFromProto(*txn->schema(), *request, &read_arg));
// Execute read on backend.
std::unique_ptr<backend::RowCursor> cursor;
ZETASQL_RETURN_IF_ERROR(txn->Read(read_arg, &cursor));
// Populate transaction metadata.
if (ShouldReturnTransaction(request->transaction())) {
ZETASQL_ASSIGN_OR_RETURN(*response->mutable_metadata()->mutable_transaction(),
txn->ToProto());
}
// Convert read results to proto.
return RowCursorToResultSetProto(cursor.get(), request->limit(), response);
});
}
REGISTER_GRPC_HANDLER(Spanner, Read);
// Reads rows from the database, returning all results as a stream.
//
// StreamingReads do not support resume_tokens in the emulator. This
// implementation does not limit the size of the response and therefore,
// chunked_value will always be false.
absl::Status StreamingRead(
RequestContext* ctx, const spanner_api::ReadRequest* request,
ServerStream<spanner_api::PartialResultSet>* stream) {
// Get session information.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
GetSession(ctx, request->session()));
// Get underlying transaction.
ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForRead(request->transaction()));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
session->FindOrInitTransaction(request->transaction()));
ZETASQL_RETURN_IF_ERROR(
ValidateDirectedReadsOption(request->directed_read_options(), txn));
// Wrap all operations on this transaction so they are atomic.
return txn->GuardedCall(Transaction::OpType::kRead, [&]() -> absl::Status {
// Cannot read after commit, rollback, or non-recoverable error.
if (txn->IsInvalid()) {
return error::CannotUseTransactionAfterConstraintError();
}
if (txn->IsCommitted() || txn->IsRolledback()) {
return error::CannotReadOrQueryAfterCommitOrRollback();
}
if (txn->IsReadOnly()) {
ZETASQL_ASSIGN_OR_RETURN(absl::Time read_timestamp, txn->GetReadTimestamp());
ZETASQL_RETURN_IF_ERROR(ValidateReadTimestampNotTooFarInFuture(
read_timestamp, ctx->env()->clock()->Now()));
}
// Parse read request.
backend::ReadArg read_arg;
ZETASQL_RETURN_IF_ERROR(ReadArgFromProto(*txn->schema(), *request, &read_arg));
// Execute read on backend.
std::unique_ptr<backend::RowCursor> cursor;
ZETASQL_RETURN_IF_ERROR(txn->Read(read_arg, &cursor));
// Convert read results to protos.
ZETASQL_ASSIGN_OR_RETURN(
std::vector<spanner_api::PartialResultSet> responses,
RowCursorToPartialResultSetProtos(cursor.get(), request->limit()));
// Populate transaction metadata.
if (ShouldReturnTransaction(request->transaction())) {
ZETASQL_ASSIGN_OR_RETURN(
*responses.front().mutable_metadata()->mutable_transaction(),
txn->ToProto());
}
// Send results back to client.
for (const auto& response : responses) {
stream->Send(response);
}
return absl::OkStatus();
});
}
REGISTER_GRPC_HANDLER(Spanner, StreamingRead);
} // namespace frontend
} // namespace emulator
} // namespace spanner
} // namespace google