frontend/handlers/databases.cc (219 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <memory>
#include <string>
#include <vector>
#include "google/longrunning/operations.pb.h"
#include "google/protobuf/empty.pb.h"
#include "google/protobuf/timestamp.pb.h"
#include "google/spanner/admin/database/v1/common.pb.h"
#include "google/spanner/admin/database/v1/spanner_database_admin.pb.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "backend/database/database.h"
#include "backend/schema/ddl/operations.pb.h"
#include "backend/schema/parser/ddl_parser.h"
#include "backend/schema/printer/print_ddl.h"
#include "backend/schema/updater/schema_updater.h"
#include "common/errors.h"
#include "common/feature_flags.h"
#include "frontend/common/uris.h"
#include "frontend/converters/time.h"
#include "frontend/entities/database.h"
#include "frontend/server/handler.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace frontend {
namespace {
namespace database_api = ::google::spanner::admin::database::v1;
namespace operations_api = ::google::longrunning;
namespace protobuf_api = ::google::protobuf;
} // namespace
// Lists all databases in an instance.
absl::Status ListDatabases(RequestContext* ctx,
const database_api::ListDatabasesRequest* request,
database_api::ListDatabasesResponse* response) {
// Validate that the ListDatabases request is for a valid instance.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Instance> instance,
GetInstance(ctx, request->parent()));
// Validate that the page_token provided is a valid database_uri.
if (!request->page_token().empty()) {
absl::string_view project_id, instance_id, database_id;
ZETASQL_RETURN_IF_ERROR(ParseDatabaseUri(request->page_token(), &project_id,
&instance_id, &database_id));
}
ZETASQL_ASSIGN_OR_RETURN(
std::vector<std::shared_ptr<Database>> databases,
ctx->env()->database_manager()->ListDatabases(request->parent()));
int32_t page_size = request->page_size();
static const int32_t kMaxPageSize = 1000;
if (page_size <= 0 || page_size > kMaxPageSize) {
page_size = kMaxPageSize;
}
// Databases returned from database manager are sorted by database_uri and
// thus we use database uri of first database in next page as next_page_token.
for (const auto& database : databases) {
if (response->databases_size() >= page_size) {
response->set_next_page_token(database->database_uri());
break;
}
if (database->database_uri() >= request->page_token()) {
ZETASQL_RETURN_IF_ERROR(database->ToProto(response->add_databases()));
}
}
return absl::OkStatus();
}
REGISTER_GRPC_HANDLER(DatabaseAdmin, ListDatabases);
// Creates a new database within an instance.
absl::Status CreateDatabase(RequestContext* ctx,
const database_api::CreateDatabaseRequest* request,
operations_api::Operation* response) {
// Validate the request.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Instance> instance,
GetInstance(ctx, request->parent()));
if (request->create_statement().empty()) {
return error::CreateDatabaseMissingCreateStatement();
}
// Determine the database dialect.
database_api::DatabaseDialect dialect;
// DATABASE_DIALECT_UNSPECIFIED defaults to creating a database with the
// GOOGLE_STANDARD_SQL dialect.
if (request->database_dialect() ==
database_api::DatabaseDialect::DATABASE_DIALECT_UNSPECIFIED) {
dialect = database_api::DatabaseDialect::GOOGLE_STANDARD_SQL;
} else if (!EmulatorFeatureFlags::instance()
.flags()
.enable_postgresql_interface &&
request->database_dialect() ==
database_api::DatabaseDialect::POSTGRESQL) {
return error::CannotCreatePostgreSQLDialectDatabase();
} else {
dialect = request->database_dialect();
}
// Extract database name from create statement.
ZETASQL_ASSIGN_OR_RETURN(
std::unique_ptr<backend::ddl::DDLStatement> stmt,
backend::ParseDDLByDialect(request->create_statement(), dialect));
std::string database_name = stmt->create_database().db_name();
// Validate database name.
ZETASQL_RETURN_IF_ERROR(ValidateDatabaseId(database_name));
// Create the database.
std::string database_uri = MakeDatabaseUri(request->parent(), database_name);
std::vector<std::string> create_statements;
for (const std::string& statement : request->extra_statements()) {
create_statements.push_back(statement);
}
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Database> database,
ctx->env()->database_manager()->CreateDatabase(
database_uri, backend::SchemaChangeOperation{
.statements = create_statements,
.proto_descriptor_bytes =
request->proto_descriptors(),
.database_dialect = dialect,
}));
// Create an operation tracking the database creation.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Operation> operation,
ctx->env()->operation_manager()->CreateOperation(
database_uri, OperationManager::kAutoGeneratedId));
database_api::CreateDatabaseMetadata metadata;
metadata.set_database(database_uri);
operation->SetMetadata(metadata);
database_api::Database response_database;
ZETASQL_RETURN_IF_ERROR(database->ToProto(&response_database));
operation->SetResponse(response_database);
operation->ToProto(response);
return absl::OkStatus();
}
REGISTER_GRPC_HANDLER(DatabaseAdmin, CreateDatabase);
// Gets the current state of a database.
absl::Status GetDatabase(RequestContext* ctx,
const database_api::GetDatabaseRequest* request,
database_api::Database* response) {
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Database> database,
GetDatabase(ctx, request->name()));
return database->ToProto(response);
}
REGISTER_GRPC_HANDLER(DatabaseAdmin, GetDatabase);
// Updates the schema of a database.
absl::Status UpdateDatabaseDdl(
RequestContext* ctx, const database_api::UpdateDatabaseDdlRequest* request,
operations_api::Operation* response) {
// Validate request URI.
absl::string_view project_id, instance_id, database_id;
ZETASQL_RETURN_IF_ERROR(ParseDatabaseUri(request->database(), &project_id,
&instance_id, &database_id));
// Check for request replay.
if (!request->operation_id().empty()) {
if (!IsValidOperationId(request->operation_id())) {
return error::InvalidOperationId(request->operation_id());
}
const std::string operation_uri =
MakeOperationUri(request->database(), request->operation_id());
auto maybe_operation =
ctx->env()->operation_manager()->GetOperation(operation_uri);
if (maybe_operation.ok()) {
return error::OperationAlreadyExists(operation_uri);
}
}
// Lookup the database by URI.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Database> database,
GetDatabase(ctx, request->database()));
std::vector<std::string> statements;
for (const std::string& statement : request->statements()) {
statements.push_back(statement);
}
backend::Database* backend_database = database->backend();
int num_succesful_statements;
absl::Time commit_timestamp;
absl::Status backfill_status;
ZETASQL_RETURN_IF_ERROR(backend_database->UpdateSchema(
backend::SchemaChangeOperation{
.statements = statements,
.proto_descriptor_bytes = request->proto_descriptors(),
.database_dialect = backend_database->dialect()},
&num_succesful_statements, &commit_timestamp, &backfill_status));
// Populate ResultSet metadata.
// For simplicity in emulator, we have implemented the schema updates in such
// a way that all the statements in update ddl execute at the same commit
// timestamp.
database_api::UpdateDatabaseDdlMetadata update_md;
update_md.set_database(request->database());
for (const std::string& statement : statements) {
update_md.add_statements(statement);
}
// Only the timestamps of the successful statements are reported.
for (int i = 0; i < num_succesful_statements; ++i) {
ZETASQL_ASSIGN_OR_RETURN(*update_md.add_commit_timestamps(),
TimestampToProto(commit_timestamp));
}
// Create operation to be returned as part of the response.
// A user-supplied operation_id would have already been validated above.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Operation> operation,
ctx->env()->operation_manager()->CreateOperation(
request->database(), request->operation_id()));
operation->SetMetadata(update_md);
if (backfill_status.ok()) {
operation->SetResponse(protobuf_api::Empty());
} else {
operation->SetError(backfill_status);
}
operation->ToProto(response);
return absl::OkStatus();
}
REGISTER_GRPC_HANDLER(DatabaseAdmin, UpdateDatabaseDdl);
// Drops (aka deletes) a database.
absl::Status DropDatabase(RequestContext* ctx,
const database_api::DropDatabaseRequest* request,
protobuf_api::Empty* response) {
// Validate the request.
absl::string_view project_id, instance_id, database_id;
ZETASQL_RETURN_IF_ERROR(ParseDatabaseUri(request->database(), &project_id,
&instance_id, &database_id));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Instance> instance,
GetInstance(ctx, MakeInstanceUri(project_id, instance_id)));
// Clean up resources associated with the database.
auto maybe_database =
ctx->env()->database_manager()->GetDatabase(request->database());
if (maybe_database.ok()) {
ZETASQL_ASSIGN_OR_RETURN(
std::vector<std::shared_ptr<Session>> sessions,
ctx->env()->session_manager()->ListSessions(request->database()));
for (const auto& session : sessions) {
ZETASQL_RETURN_IF_ERROR(
ctx->env()->session_manager()->DeleteSession(session->session_uri()));
}
}
// Clean up the database.
return ctx->env()->database_manager()->DeleteDatabase(request->database());
}
REGISTER_GRPC_HANDLER(DatabaseAdmin, DropDatabase);
// Returns the schema of a database as a list of formatted DDL statements.
absl::Status GetDatabaseDdl(RequestContext* ctx,
const database_api::GetDatabaseDdlRequest* request,
database_api::GetDatabaseDdlResponse* response) {
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Database> database,
GetDatabase(ctx, request->database()));
absl::StatusOr<std::vector<std::string>> printed_statements =
backend::PrintDDLStatements(database->backend()->GetLatestSchema());
ZETASQL_RETURN_IF_ERROR(printed_statements.status());
for (const auto& statement : *printed_statements) {
response->add_statements(statement);
}
return absl::OkStatus();
}
REGISTER_GRPC_HANDLER(DatabaseAdmin, GetDatabaseDdl);
} // namespace frontend
} // namespace emulator
} // namespace spanner
} // namespace google