frontend/converters/reads.cc (211 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "frontend/converters/reads.h" #include <limits> #include <vector> #include "google/protobuf/struct.pb.h" #include "google/spanner/v1/keys.pb.h" #include "google/spanner/v1/result_set.pb.h" #include "google/spanner/v1/transaction.pb.h" #include "zetasql/public/type.h" #include "zetasql/public/value.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" #include "backend/access/write.h" #include "backend/datamodel/key.h" #include "backend/datamodel/key_range.h" #include "backend/datamodel/key_set.h" #include "backend/datamodel/value.h" #include "backend/schema/catalog/column.h" #include "backend/schema/catalog/schema.h" #include "backend/schema/catalog/table.h" #include "backend/transaction/options.h" #include "common/errors.h" #include "common/limits.h" #include "frontend/converters/chunking.h" #include "frontend/converters/keys.h" #include "frontend/converters/partition.h" #include "frontend/converters/time.h" #include "frontend/converters/types.h" #include "frontend/converters/values.h" #include "frontend/proto/partition_token.pb.h" #include "absl/status/status.h" #include "zetasql/base/status_macros.h" namespace google { namespace spanner { namespace emulator { namespace frontend { namespace spanner_api = ::google::spanner::v1; namespace { absl::Status ResultSetMetadataToProto(backend::RowCursor* cursor, v1::ResultSetMetadata* metadata_pb) { for (int i = 0; i < cursor->NumColumns(); ++i) { auto* field_pb = metadata_pb->mutable_row_type()->add_fields(); field_pb->set_name(cursor->ColumnName(i)); ZETASQL_RETURN_IF_ERROR( TypeToProto(cursor->ColumnType(i), field_pb->mutable_type())) << " when converting column " << cursor->ColumnName(i) << " of type " << cursor->ColumnType(i) << " at position " << i << " in row cursor"; } return absl::OkStatus(); } absl::Status ValidateStaleness(absl::Duration staleness) { if (staleness < absl::ZeroDuration()) { return error::StalenessMustBeNonNegative(); } return absl::OkStatus(); } absl::Status ValidateMinReadTimestamp(absl::Time min_read_timestamp) { const int64_t timestamp = absl::ToUnixMicros(min_read_timestamp); if (timestamp < 0 || timestamp == std::numeric_limits<int64_t>::max()) { return error::InvalidMinReadTimestamp(min_read_timestamp); } return absl::OkStatus(); } absl::Status ValidateExactReadTimestamp(absl::Time exact_read_timestamp) { const int64_t timestamp = absl::ToUnixMicros(exact_read_timestamp); if (timestamp < 0 || timestamp == std::numeric_limits<int64_t>::max()) { return error::InvalidExactReadTimestamp(exact_read_timestamp); } return absl::OkStatus(); } absl::Status ValidatePartitionToken( const PartitionToken& partition_token, const google::spanner::v1::ReadRequest& request) { if (partition_token.session() != request.session()) { return error::ReadFromDifferentSession(); } if (request.transaction().selector_case() != v1::TransactionSelector::kId || partition_token.transaction_id() != request.transaction().id()) { return error::ReadFromDifferentTransaction(); } if (!partition_token.has_read_params()) { return error::ReadFromDifferentParameters(); } auto read_params = partition_token.read_params(); if (read_params.table() != request.table() || read_params.index() != request.index() || read_params.columns_size() != request.columns_size()) { return error::ReadFromDifferentParameters(); } for (int i = 0; i < request.columns_size(); i++) { if (read_params.columns(i) != request.columns(i)) { return error::ReadFromDifferentParameters(); } } if (read_params.key_set().SerializeAsString() != request.key_set().SerializeAsString()) { return error::ReadFromDifferentParameters(); } return absl::OkStatus(); } } // namespace absl::StatusOr<backend::ReadOnlyOptions> ReadOnlyOptionsFromProto( const spanner_api::TransactionOptions::ReadOnly& proto) { using ReadOnly = spanner_api::TransactionOptions::ReadOnly; backend::ReadOnlyOptions options; switch (proto.timestamp_bound_case()) { case ReadOnly::kMinReadTimestamp: { ZETASQL_ASSIGN_OR_RETURN(options.timestamp, TimestampFromProto(proto.min_read_timestamp())); ZETASQL_RETURN_IF_ERROR(ValidateMinReadTimestamp(options.timestamp)); options.bound = backend::TimestampBound::kMinTimestamp; break; } case ReadOnly::kMaxStaleness: { ZETASQL_ASSIGN_OR_RETURN(options.staleness, DurationFromProto(proto.max_staleness())); ZETASQL_RETURN_IF_ERROR(ValidateStaleness(options.staleness)); options.bound = backend::TimestampBound::kMaxStaleness; break; } case ReadOnly::kReadTimestamp: { ZETASQL_ASSIGN_OR_RETURN(options.timestamp, TimestampFromProto(proto.read_timestamp())); ZETASQL_RETURN_IF_ERROR(ValidateExactReadTimestamp(options.timestamp)); options.bound = backend::TimestampBound::kExactTimestamp; break; } case ReadOnly::kExactStaleness: { ZETASQL_ASSIGN_OR_RETURN(options.staleness, DurationFromProto(proto.exact_staleness())); ZETASQL_RETURN_IF_ERROR(ValidateStaleness(options.staleness)); options.bound = backend::TimestampBound::kExactStaleness; break; } case ReadOnly::kStrong: if (!proto.strong()) { return error::StrongReadOptionShouldBeTrue(); } ABSL_FALLTHROUGH_INTENDED; case ReadOnly::TIMESTAMP_BOUND_NOT_SET: options.bound = backend::TimestampBound::kStrongRead; break; } return options; } absl::Status ReadArgFromProto(const backend::Schema& schema, const google::spanner::v1::ReadRequest& request, backend::ReadArg* read_arg) { if (!request.has_key_set()) { return error::MissingRequiredFieldError("ReadRequest.key_set"); } if (request.table().empty()) { return error::MutationTableRequired(); } if (request.limit() < 0) { return error::InvalidReadLimit(); } if (request.limit() > 0 && !request.partition_token().empty()) { return error::InvalidReadLimitWithPartitionToken(); } auto key_set = request.key_set(); if (!request.partition_token().empty()) { ZETASQL_ASSIGN_OR_RETURN(auto partition_token, PartitionTokenFromString(request.partition_token())); ZETASQL_RETURN_IF_ERROR(ValidatePartitionToken(partition_token, request)); key_set = partition_token.partitioned_key_set(); } read_arg->table = request.table(); read_arg->index = request.index(); read_arg->columns.assign(request.columns().begin(), request.columns().end()); const backend::Table* table = schema.FindTable(request.table()); if (table == nullptr) { return error::TableNotFound(request.table()); } if (!request.index().empty()) { const backend::Index* index = schema.FindIndex(request.index()); if (index == nullptr) { return error::IndexNotFound(request.index(), request.table()); } table = index->index_data_table(); } ZETASQL_ASSIGN_OR_RETURN(read_arg->key_set, KeySetFromProto(key_set, *table)); return absl::OkStatus(); } absl::Status RowCursorToResultSetProto(backend::RowCursor* cursor, int limit, spanner_api::ResultSet* result_pb) { ZETASQL_RETURN_IF_ERROR( ResultSetMetadataToProto(cursor, result_pb->mutable_metadata())); // Iterate over all rows and populate column values into ResultSet. int row_count = 0; while (cursor->Next()) { auto* row_pb = result_pb->add_rows(); for (int i = 0; i < cursor->NumColumns(); ++i) { ZETASQL_ASSIGN_OR_RETURN(*row_pb->add_values(), ValueToProto(cursor->ColumnValue(i))); } ++row_count; if (limit > 0 && limit == row_count) { break; } } return absl::OkStatus(); } absl::StatusOr<std::vector<spanner_api::PartialResultSet>> RowCursorToPartialResultSetProtos(backend::RowCursor* cursor, int limit) { spanner_api::ResultSet result_set; ZETASQL_RETURN_IF_ERROR(RowCursorToResultSetProto(cursor, limit, &result_set)); return ChunkResultSet(result_set, limits::kMaxStreamingChunkSize); } } // namespace frontend } // namespace emulator } // namespace spanner } // namespace google