frontend/handlers/sessions.cc (129 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <algorithm>
#include <memory>
#include <vector>
#include "google/protobuf/empty.pb.h"
#include "google/spanner/v1/spanner.pb.h"
#include "common/errors.h"
#include "common/limits.h"
#include "frontend/collections/database_manager.h"
#include "frontend/common/labels.h"
#include "frontend/common/uris.h"
#include "frontend/entities/database.h"
#include "frontend/entities/session.h"
#include "frontend/server/environment.h"
#include "frontend/server/handler.h"
#include "absl/status/status.h"
#include "zetasql/base/status_macros.h"
namespace protobuf_api = ::google::protobuf;
namespace google {
namespace spanner {
namespace emulator {
namespace frontend {
// Creates a new session.
absl::Status CreateSession(RequestContext* ctx,
const spanner_api::CreateSessionRequest* request,
spanner_api::Session* response) {
// Validate the request.
absl::string_view project_id, instance_id, database_id;
ZETASQL_RETURN_IF_ERROR(ParseDatabaseUri(request->database(), &project_id,
&instance_id, &database_id));
ZETASQL_RETURN_IF_ERROR(ValidateLabels(request->session().labels()));
// Check that the instance is valid.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Instance> instance,
GetInstance(ctx, MakeInstanceUri(project_id, instance_id)));
// Fetch the database.
ZETASQL_ASSIGN_OR_RETURN(
std::shared_ptr<Database> database,
ctx->env()->database_manager()->GetDatabase(request->database()));
// Create a session.
Labels labels(request->session().labels().begin(),
request->session().labels().end());
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
ctx->env()->session_manager()->CreateSession(
labels, request->session().multiplexed(), database));
// Return details about the newly created session.
return session->ToProto(response, /*include_labels=*/true);
}
REGISTER_GRPC_HANDLER(Spanner, CreateSession);
// Creates a batch of new sessions.
absl::Status BatchCreateSessions(
RequestContext* ctx, const spanner_api::BatchCreateSessionsRequest* request,
spanner_api::BatchCreateSessionsResponse* response) {
// Validate the request.
absl::string_view project_id, instance_id, database_id;
ZETASQL_RETURN_IF_ERROR(ParseDatabaseUri(request->database(), &project_id,
&instance_id, &database_id));
ZETASQL_RETURN_IF_ERROR(ValidateLabels(request->session_template().labels()));
if (request->session_count() < 0) {
return error::TooFewSessions(request->session_count());
}
// Check that the instance is valid.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Instance> instance,
GetInstance(ctx, MakeInstanceUri(project_id, instance_id)));
// Fetch the database to ensure that it exists.
ZETASQL_ASSIGN_OR_RETURN(
std::shared_ptr<Database> database,
ctx->env()->database_manager()->GetDatabase(request->database()));
// Silently truncate requested session count to max allowed session count.
const int32_t actual_session_count =
std::min(limits::kMaxBatchCreateSessionsCount, request->session_count());
// Create the requested sessions.
std::vector<std::shared_ptr<Session>> sessions(actual_session_count);
Labels labels(request->session_template().labels().begin(),
request->session_template().labels().end());
for (int i = 0; i < sessions.size(); ++i) {
ZETASQL_ASSIGN_OR_RETURN(
sessions[i],
ctx->env()->session_manager()->CreateSession(
labels, request->session_template().multiplexed(), database));
}
// Return details about the newly created session.
for (const auto& session : sessions) {
ZETASQL_RETURN_IF_ERROR(session->ToProto(response->add_session()));
}
return absl::OkStatus();
}
REGISTER_GRPC_HANDLER(Spanner, BatchCreateSessions);
// Gets information about a particular session.
absl::Status GetSession(RequestContext* ctx,
const spanner_api::GetSessionRequest* request,
spanner_api::Session* response) {
absl::string_view project_id, instance_id, database_id, session_id;
ZETASQL_RETURN_IF_ERROR(ParseSessionUri(request->name(), &project_id, &instance_id,
&database_id, &session_id));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
ctx->env()->session_manager()->GetSession(request->name()));
return session->ToProto(response, /*include_labels=*/false);
}
REGISTER_GRPC_HANDLER(Spanner, GetSession);
// Lists all sessions in a given database that match the specified filter.
absl::Status ListSessions(RequestContext* ctx,
const spanner_api::ListSessionsRequest* request,
spanner_api::ListSessionsResponse* response) {
// Validate the request.
absl::string_view project_id, instance_id, database_id;
ZETASQL_RETURN_IF_ERROR(ParseDatabaseUri(request->database(), &project_id,
&instance_id, &database_id));
// Check that the instance is valid.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Instance> instance,
GetInstance(ctx, MakeInstanceUri(project_id, instance_id)));
// Fetch the database to ensure that it exists.
ZETASQL_ASSIGN_OR_RETURN(
std::shared_ptr<Database> database,
ctx->env()->database_manager()->GetDatabase(request->database()));
// List all sessions for the given database.
ZETASQL_ASSIGN_OR_RETURN(
std::vector<std::shared_ptr<Session>> sessions,
ctx->env()->session_manager()->ListSessions(database->database_uri()));
int32_t page_size = request->page_size();
static const int32_t kMaxPageSize = 1000;
if (page_size <= 0 || page_size > kMaxPageSize) {
page_size = kMaxPageSize;
}
// Sessions returned from session manager are sorted by session_uri and
// thus we use first session uri after requested page size as next_page_token.
for (const auto& session : sessions) {
if (response->sessions_size() >= page_size) {
response->set_next_page_token(session->session_uri());
break;
}
if (session->session_uri() >= request->page_token()) {
ZETASQL_RETURN_IF_ERROR(session->ToProto(response->add_sessions(),
/*include_labels=*/true));
}
}
return absl::OkStatus();
}
REGISTER_GRPC_HANDLER(Spanner, ListSessions);
// Ends a session, releasing server resources associated with it.
absl::Status DeleteSession(RequestContext* ctx,
const spanner_api::DeleteSessionRequest* request,
protobuf_api::Empty* response) {
absl::string_view project_id, instance_id, database_id, session_id;
ZETASQL_RETURN_IF_ERROR(ParseSessionUri(request->name(), &project_id, &instance_id,
&database_id, &session_id));
return ctx->env()->session_manager()->DeleteSession(request->name());
}
REGISTER_GRPC_HANDLER(Spanner, DeleteSession);
} // namespace frontend
} // namespace emulator
} // namespace spanner
} // namespace google