frontend/converters/mutations.cc (151 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "frontend/converters/mutations.h" #include <algorithm> #include <string> #include <utility> #include <vector> #include "google/protobuf/struct.pb.h" #include "google/spanner/v1/keys.pb.h" #include "google/spanner/v1/result_set.pb.h" #include "zetasql/public/type.h" #include "zetasql/public/value.h" #include "absl/strings/str_cat.h" #include "backend/access/write.h" #include "backend/datamodel/key.h" #include "backend/datamodel/key_range.h" #include "backend/datamodel/key_set.h" #include "backend/datamodel/value.h" #include "backend/schema/catalog/column.h" #include "backend/schema/catalog/schema.h" #include "backend/schema/catalog/table.h" #include "common/errors.h" #include "frontend/converters/keys.h" #include "frontend/converters/types.h" #include "frontend/converters/values.h" #include "absl/status/status.h" namespace google { namespace spanner { namespace emulator { namespace frontend { namespace spanner_api = ::google::spanner::v1; namespace { absl::Status WriteFromProto(const backend::Schema& schema, const spanner_api::Mutation::Write& write_pb, backend::MutationOpType op_type, backend::Mutation* mutation) { if (write_pb.table().empty()) { return error::MutationTableRequired(); } const backend::Table* table = schema.FindTable(write_pb.table()); if (table == nullptr) { return error::TableNotFound(write_pb.table()); } // Check that columns exist within table and get the column names. std::vector<const backend::Column*> columns(write_pb.columns_size()); std::vector<std::string> column_names(write_pb.columns_size()); for (int i = 0; i < write_pb.columns_size(); ++i) { columns[i] = table->FindColumn(write_pb.columns(i)); if (columns[i] == nullptr) { return error::ColumnNotFound(write_pb.table(), write_pb.columns(i)); } column_names[i] = columns[i]->Name(); } if (write_pb.values_size() == 0) { return error::MissingRequiredFieldError("Write.values"); } // Populate the list of values for the rows that will be written to. std::vector<backend::ValueList> value_list; for (const google::protobuf::ListValue& values : write_pb.values()) { backend::ValueList row_values; if (values.values_size() != columns.size()) { return error::MutationColumnAndValueSizeMismatch(columns.size(), values.values_size()); } for (int i = 0; i < columns.size(); ++i) { ZETASQL_ASSIGN_OR_RETURN(row_values.emplace_back(), ValueFromProto(values.values(i), columns[i]->GetType())); } value_list.push_back(std::move(row_values)); } mutation->AddWriteOp(op_type, table->Name(), std::move(column_names), std::move(value_list)); return absl::OkStatus(); } absl::Status ValidateDeleteRange(const backend::KeyRange& range) { int max_columns = std::max(range.start_key().NumColumns(), range.limit_key().NumColumns()); int min_columns = std::min(range.start_key().NumColumns(), range.limit_key().NumColumns()); // No-op if no columns were specified at all. if (max_columns == 0) { return absl::OkStatus(); } // Early exit if one of the keys has more than one extra part. if (max_columns > min_columns + 1) { return error::BadDeleteRange(range.start_key().DebugString(), range.limit_key().DebugString()); } // If we reached here, the number of columns in start and limit are either // equal, or differ by one. We check up to the prefix for equality. int num_compare_columns = max_columns == min_columns ? min_columns - 1 : min_columns; for (int i = 0; i < num_compare_columns; ++i) { if (range.start_key().ColumnValue(i) != range.limit_key().ColumnValue(i)) { return error::BadDeleteRange(range.start_key().DebugString(), range.limit_key().DebugString()); } } return absl::OkStatus(); } absl::Status DeleteFromProto(const backend::Schema& schema, const spanner_api::Mutation::Delete& delete_pb, backend::Mutation* mutation) { // Verify that the table exists. if (delete_pb.table().empty()) { return error::MutationTableRequired(); } const backend::Table* table = schema.FindTable(delete_pb.table()); if (table == nullptr) { return error::TableNotFound(delete_pb.table()); } // Parse the delete key set from the request proto. ZETASQL_ASSIGN_OR_RETURN(backend::KeySet key_set, KeySetFromProto(delete_pb.key_set(), *table)); // Perform extra validations for delete ranges in the request key set. for (const backend::KeyRange& range : key_set.ranges()) { ZETASQL_RETURN_IF_ERROR(ValidateDeleteRange(range)); } mutation->AddDeleteOp(table->Name(), key_set); return absl::OkStatus(); } } // namespace absl::Status MutationFromProto( const backend::Schema& schema, const google::protobuf::RepeatedPtrField<spanner_api::Mutation>& mutation_pbs, backend::Mutation* mutation) { for (const spanner_api::Mutation& mutation_pb : mutation_pbs) { switch (mutation_pb.operation_case()) { case spanner_api::Mutation::kInsert: ZETASQL_RETURN_IF_ERROR(WriteFromProto(schema, mutation_pb.insert(), backend::MutationOpType::kInsert, mutation)); break; case spanner_api::Mutation::kUpdate: ZETASQL_RETURN_IF_ERROR(WriteFromProto(schema, mutation_pb.update(), backend::MutationOpType::kUpdate, mutation)); break; case spanner_api::Mutation::kInsertOrUpdate: ZETASQL_RETURN_IF_ERROR(WriteFromProto(schema, mutation_pb.insert_or_update(), backend::MutationOpType::kInsertOrUpdate, mutation)); break; case spanner_api::Mutation::kReplace: ZETASQL_RETURN_IF_ERROR(WriteFromProto(schema, mutation_pb.replace(), backend::MutationOpType::kReplace, mutation)); break; case spanner_api::Mutation::kDelete: ZETASQL_RETURN_IF_ERROR( DeleteFromProto(schema, mutation_pb.delete_(), mutation)); break; default: return error::MissingRequiredFieldError("Mutation.operation"); } } return absl::OkStatus(); } } // namespace frontend } // namespace emulator } // namespace spanner } // namespace google