frontend/server/server.cc (250 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "frontend/server/server.h"
#include <memory>
#include <string>
#include <utility>
#include "zetasql/base/logging.h"
#include "google/iam/v1/iam_policy.pb.h"
#include "google/iam/v1/policy.pb.h"
#include "google/longrunning/operations.grpc.pb.h"
#include "google/protobuf/empty.pb.h"
#include "google/rpc/error_details.pb.h"
#include "google/spanner/admin/database/v1/spanner_database_admin.grpc.pb.h"
#include "google/spanner/admin/instance/v1/spanner_instance_admin.grpc.pb.h"
#include "google/spanner/v1/commit_response.pb.h"
#include "google/spanner/v1/result_set.pb.h"
#include "google/spanner/v1/spanner.grpc.pb.h"
#include "google/spanner/v1/spanner.pb.h"
#include "google/spanner/v1/transaction.pb.h"
#include "absl/memory/memory.h"
#include "common/constants.h"
#include "common/errors.h"
#include "common/limits.h"
#include "frontend/common/status.h"
#include "frontend/server/handler.h"
#include "frontend/server/request_context.h"
#include "grpcpp/server_builder.h"
#include "grpcpp/support/status.h"
namespace google {
namespace spanner {
namespace emulator {
namespace frontend {
// Convenience namespace aliases.
namespace database_api = ::google::spanner::admin::database::v1;
namespace iam_api = ::google::iam::v1;
namespace instance_api = ::google::spanner::admin::instance::v1;
namespace operations_api = ::google::longrunning;
namespace protobuf_api = ::google::protobuf;
namespace spanner_api = ::google::spanner::v1;
namespace {
void MaybeAddTrailingMetadata(const absl::Status& status, RequestContext* ctx) {
if (!status.ok()) {
// Check for ResourceInfo within the returned status and append it as extra
// trailing metadata. The Java client library expects the ResourceInfo to be
// added as additional trailing metadata.
auto payload = status.GetPayload(kResourceInfoType);
if (payload.has_value()) {
std::string serialized_info(payload.value());
ctx->grpc()->AddTrailingMetadata(kResourceInfoBinaryHeader,
serialized_info);
}
}
}
} // namespace
// Invokes the given unary gRPC method on the given service by looking up the
// handler registry. Returns INTERNAL error if the handler could not be found.
template <typename RequestT, typename ResponseT>
absl::Status Invoke(const std::string& service_name,
const std::string& method_name,
grpc::ServerContext* grpc_ctx, ServerEnv* env,
const RequestT* request, ResponseT* response) {
GRPCHandlerBase* handler = GetHandler(service_name, method_name);
if (!handler) {
return error::Internal(absl::StrCat("Could not find handler for ",
service_name, ".", method_name));
}
RequestContext ctx(env, grpc_ctx);
absl::Status status =
dynamic_cast<UnaryGRPCHandler<RequestT, ResponseT>*>(handler)->Run(
&ctx, request, response);
MaybeAddTrailingMetadata(status, &ctx);
return status;
}
// Invokes the given server streaming gRPC method on the given service by
// looking up the handler registry. Returns INTERNAL error if the handler could
// not be found.
template <typename RequestT, typename ResponseT>
absl::Status Invoke(const std::string& service_name,
const std::string& method_name,
grpc::ServerContext* grpc_ctx, ServerEnv* env,
const RequestT* request,
grpc::ServerWriter<ResponseT>* writer) {
GRPCHandlerBase* handler = GetHandler(service_name, method_name);
if (!handler) {
return error::Internal(absl::StrCat("Could not find handler for ",
service_name, ".", method_name));
}
RequestContext ctx(env, grpc_ctx);
absl::Status status =
dynamic_cast<ServerStreamingGRPCHandler<RequestT, ResponseT>*>(handler)
->Run(&ctx, request, writer);
MaybeAddTrailingMetadata(status, &ctx);
return status;
}
#define DEFINE_GRPC_METHOD(ServiceName, MethodName, RequestType, ResponseType) \
grpc::Status MethodName(grpc::ServerContext* grpc_ctx, \
const RequestType* request, ResponseType* response) \
override { \
return ToGRPCStatus( \
Invoke(#ServiceName, #MethodName, grpc_ctx, env_, request, response)); \
}
// Implementation of the Spanner gRPC service.
class SpannerService : public spanner_api::Spanner::Service {
public:
explicit SpannerService(ServerEnv* env) : env_(env) {}
// Sessions.
DEFINE_GRPC_METHOD(Spanner, CreateSession, spanner_api::CreateSessionRequest,
spanner_api::Session)
DEFINE_GRPC_METHOD(Spanner, GetSession, spanner_api::GetSessionRequest,
spanner_api::Session)
DEFINE_GRPC_METHOD(Spanner, ListSessions, spanner_api::ListSessionsRequest,
spanner_api::ListSessionsResponse)
DEFINE_GRPC_METHOD(Spanner, DeleteSession, spanner_api::DeleteSessionRequest,
protobuf_api::Empty)
DEFINE_GRPC_METHOD(Spanner, BatchCreateSessions,
spanner_api::BatchCreateSessionsRequest,
spanner_api::BatchCreateSessionsResponse)
// Reads.
DEFINE_GRPC_METHOD(Spanner, Read, spanner_api::ReadRequest,
spanner_api::ResultSet);
DEFINE_GRPC_METHOD(Spanner, StreamingRead, spanner_api::ReadRequest,
grpc::ServerWriter<v1::PartialResultSet>);
// Queries.
DEFINE_GRPC_METHOD(Spanner, ExecuteSql, spanner_api::ExecuteSqlRequest,
spanner_api::ResultSet);
DEFINE_GRPC_METHOD(Spanner, ExecuteStreamingSql,
spanner_api::ExecuteSqlRequest,
grpc::ServerWriter<v1::PartialResultSet>);
DEFINE_GRPC_METHOD(Spanner, ExecuteBatchDml,
spanner_api::ExecuteBatchDmlRequest,
spanner_api::ExecuteBatchDmlResponse);
// Batch
DEFINE_GRPC_METHOD(Spanner, BatchWrite, spanner_api::BatchWriteRequest,
grpc::ServerWriter<v1::BatchWriteResponse>);
// Partitions.
DEFINE_GRPC_METHOD(Spanner, PartitionRead, spanner_api::PartitionReadRequest,
spanner_api::PartitionResponse);
DEFINE_GRPC_METHOD(Spanner, PartitionQuery,
spanner_api::PartitionQueryRequest,
spanner_api::PartitionResponse);
// Transactions.
DEFINE_GRPC_METHOD(Spanner, BeginTransaction,
spanner_api::BeginTransactionRequest,
spanner_api::Transaction);
DEFINE_GRPC_METHOD(Spanner, Commit, spanner_api::CommitRequest,
spanner_api::CommitResponse);
DEFINE_GRPC_METHOD(Spanner, Rollback, spanner_api::RollbackRequest,
protobuf_api::Empty);
private:
ServerEnv* const env_;
};
// Implementation of the DatabaseAdmin gRPC service.
class DatabaseAdminService : public database_api::DatabaseAdmin::Service {
public:
explicit DatabaseAdminService(ServerEnv* env) : env_(env) {}
// Databases.
DEFINE_GRPC_METHOD(DatabaseAdmin, ListDatabases,
database_api::ListDatabasesRequest,
database_api::ListDatabasesResponse);
DEFINE_GRPC_METHOD(DatabaseAdmin, CreateDatabase,
database_api::CreateDatabaseRequest,
operations_api::Operation);
DEFINE_GRPC_METHOD(DatabaseAdmin, GetDatabase,
database_api::GetDatabaseRequest, database_api::Database);
DEFINE_GRPC_METHOD(DatabaseAdmin, DropDatabase,
database_api::DropDatabaseRequest, protobuf_api::Empty);
// Schema.
DEFINE_GRPC_METHOD(DatabaseAdmin, UpdateDatabaseDdl,
database_api::UpdateDatabaseDdlRequest,
operations_api::Operation);
DEFINE_GRPC_METHOD(DatabaseAdmin, GetDatabaseDdl,
database_api::GetDatabaseDdlRequest,
database_api::GetDatabaseDdlResponse);
// Policies.
DEFINE_GRPC_METHOD(InstanceAdmin, SetIamPolicy, iam_api::SetIamPolicyRequest,
iam_api::Policy);
DEFINE_GRPC_METHOD(InstanceAdmin, GetIamPolicy, iam_api::GetIamPolicyRequest,
iam_api::Policy);
DEFINE_GRPC_METHOD(InstanceAdmin, TestIamPermissions,
iam_api::TestIamPermissionsRequest,
iam_api::TestIamPermissionsResponse);
private:
ServerEnv* const env_;
};
// Implementation of the InstanceAdmin gRPC service.
class InstanceAdminService : public instance_api::InstanceAdmin::Service {
public:
explicit InstanceAdminService(ServerEnv* env) : env_(env) {}
// Instance configs.
DEFINE_GRPC_METHOD(InstanceAdmin, ListInstanceConfigs,
instance_api::ListInstanceConfigsRequest,
instance_api::ListInstanceConfigsResponse);
DEFINE_GRPC_METHOD(InstanceAdmin, GetInstanceConfig,
instance_api::GetInstanceConfigRequest,
instance_api::InstanceConfig);
// Instances.
DEFINE_GRPC_METHOD(InstanceAdmin, ListInstances,
instance_api::ListInstancesRequest,
instance_api::ListInstancesResponse);
DEFINE_GRPC_METHOD(InstanceAdmin, GetInstance,
instance_api::GetInstanceRequest, instance_api::Instance);
DEFINE_GRPC_METHOD(InstanceAdmin, CreateInstance,
instance_api::CreateInstanceRequest,
operations_api::Operation);
DEFINE_GRPC_METHOD(InstanceAdmin, UpdateInstance,
instance_api::UpdateInstanceRequest,
operations_api::Operation);
DEFINE_GRPC_METHOD(InstanceAdmin, DeleteInstance,
instance_api::DeleteInstanceRequest, protobuf_api::Empty);
// Policies.
DEFINE_GRPC_METHOD(InstanceAdmin, SetIamPolicy, iam_api::SetIamPolicyRequest,
iam_api::Policy);
DEFINE_GRPC_METHOD(InstanceAdmin, GetIamPolicy, iam_api::GetIamPolicyRequest,
iam_api::Policy);
DEFINE_GRPC_METHOD(InstanceAdmin, TestIamPermissions,
iam_api::TestIamPermissionsRequest,
iam_api::TestIamPermissionsResponse);
private:
ServerEnv* const env_;
};
// Implementation of the Operations gRPC service.
class OperationsService : public operations_api::Operations::Service {
public:
explicit OperationsService(ServerEnv* env) : env_(env) {}
DEFINE_GRPC_METHOD(Operations, ListOperations,
operations_api::ListOperationsRequest,
operations_api::ListOperationsResponse);
DEFINE_GRPC_METHOD(Operations, GetOperation,
operations_api::GetOperationRequest,
operations_api::Operation);
DEFINE_GRPC_METHOD(Operations, DeleteOperation,
operations_api::DeleteOperationRequest,
protobuf_api::Empty);
DEFINE_GRPC_METHOD(Operations, CancelOperation,
operations_api::CancelOperationRequest,
protobuf_api::Empty);
DEFINE_GRPC_METHOD(Operations, WaitOperation,
operations_api::WaitOperationRequest,
operations_api::Operation);
private:
ServerEnv* const env_;
};
Server::Server(std::unique_ptr<ServerEnv> env)
: env_(std::move(env)),
database_admin_service_(new DatabaseAdminService(env_.get())),
instance_admin_service_(new InstanceAdminService(env_.get())),
operations_service_(new OperationsService(env_.get())),
spanner_service_(new SpannerService(env_.get())) {}
// Server lifecycle methods.
std::unique_ptr<Server> Server::Create(const Server::Options& options) {
auto env = std::make_unique<ServerEnv>();
std::unique_ptr<Server> server = absl::WrapUnique(new Server(std::move(env)));
::grpc::ServerBuilder builder;
// Configure server address.
server->host_ = options.server_address.substr(
0, options.server_address.find_last_of(':'));
builder.AddListeningPort(options.server_address,
::grpc::InsecureServerCredentials(), &server->port_);
// Configure server message limits.
builder.AddChannelArgument(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH,
limits::kMaxGRPCOutgoingMessageSize);
builder.AddChannelArgument(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
limits::kMaxGRPCIncomingMessageSize);
// Configure services exported on this server.
builder.RegisterService(server->spanner_service_.get())
.RegisterService(server->database_admin_service_.get())
.RegisterService(server->instance_admin_service_.get())
.RegisterService(server->operations_service_.get());
// Actually start the server.
server->grpc_server_ = builder.BuildAndStart();
if (server->port_ < 0) {
ABSL_LOG(ERROR) << "Failed to bind to address: " << options.server_address;
return nullptr;
}
return server;
}
void Server::WaitForShutdown() { grpc_server_->Wait(); }
void Server::Shutdown() { grpc_server_->Shutdown(); }
} // namespace frontend
} // namespace emulator
} // namespace spanner
} // namespace google