frontend/handlers/batch.cc (104 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <memory>
#include "google/protobuf/struct.pb.h"
#include "google/protobuf/timestamp.pb.h"
#include "google/spanner/v1/query_plan.pb.h"
#include "google/spanner/v1/result_set.pb.h"
#include "google/spanner/v1/spanner.pb.h"
#include "google/spanner/v1/transaction.pb.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/time/time.h"
#include "backend/access/write.h"
#include "backend/schema/catalog/schema.h"
#include "frontend/common/protos.h"
#include "frontend/converters/mutations.h"
#include "frontend/converters/time.h"
#include "frontend/entities/session.h"
#include "frontend/entities/transaction.h"
#include "frontend/proto/partition_token.pb.h"
#include "frontend/server/handler.h"
#include "frontend/server/request_context.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace frontend {
namespace spanner_api = ::google::spanner::v1;
namespace {
// Helper function to set the status in the response.
void SetResponseStatus(spanner_api::BatchWriteResponse* response,
const absl::Status& status) {
*response->mutable_status() = StatusToProto(status);
}
// Helper function to process a single mutation group within a transaction.
absl::Status ProcessMutationGroup(
const backend::Schema* schema,
const spanner_api::BatchWriteRequest::MutationGroup& mutation_group,
Transaction* txn, spanner_api::BatchWriteResponse* response) {
backend::Mutation backend_mutation;
absl::Status status =
MutationFromProto(*schema, mutation_group.mutations(), &backend_mutation);
if (!status.ok()) {
SetResponseStatus(response, status);
txn->MaybeInvalidate(status);
return status;
}
status = txn->Write(backend_mutation);
if (!status.ok()) {
SetResponseStatus(response, status);
txn->MaybeInvalidate(status);
return status;
}
status = txn->Commit();
if (!status.ok()) {
SetResponseStatus(response, status);
txn->MaybeInvalidate(status);
return status;
}
absl::StatusOr<absl::Time> commit_time = txn->GetCommitTimestamp();
if (!commit_time.ok()) {
SetResponseStatus(response, commit_time.status());
return commit_time.status();
}
absl::StatusOr<google::protobuf::Timestamp> commit_time_proto =
TimestampToProto(*commit_time);
if (!commit_time_proto.ok()) {
SetResponseStatus(response, commit_time_proto.status());
return commit_time_proto.status();
}
*response->mutable_commit_timestamp() = *commit_time_proto;
return absl::OkStatus();
}
} // namespace
absl::Status BatchWrite(RequestContext* ctx,
const spanner_api::BatchWriteRequest* request,
ServerStream<spanner_api::BatchWriteResponse>* stream) {
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
GetSession(ctx, request->session()));
for (int i = 0; i < request->mutation_groups_size(); ++i) {
const spanner_api::BatchWriteRequest::MutationGroup& mutation_group =
request->mutation_groups(i);
spanner_api::BatchWriteResponse response;
response.add_indexes(i);
spanner_api::TransactionOptions txn_options;
txn_options.mutable_read_write();
absl::StatusOr<std::shared_ptr<Transaction>> single_use_transaction =
session->CreateSingleUseTransaction(txn_options);
if (!single_use_transaction.ok()) {
SetResponseStatus(&response, single_use_transaction.status());
stream->Send(response);
continue;
}
std::shared_ptr<Transaction> txn = *single_use_transaction;
// Wrap all operations on this transaction so they are atomic.
absl::Status txn_status =
txn->GuardedCall(Transaction::OpType::kCommit, [&]() -> absl::Status {
return ProcessMutationGroup(txn->schema(), mutation_group, txn.get(),
&response);
});
stream->Send(response);
}
return absl::OkStatus();
}
REGISTER_GRPC_HANDLER(Spanner, BatchWrite);
} // namespace frontend
} // namespace emulator
} // namespace spanner
} // namespace google