backend/transaction/transaction_store.cc (320 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "backend/transaction/transaction_store.h"
#include <algorithm>
#include <memory>
#include <utility>
#include <variant>
#include <vector>
#include "zetasql/public/value.h"
#include "absl/container/flat_hash_map.h"
#include "absl/log/check.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "backend/actions/ops.h"
#include "backend/common/rows.h"
#include "backend/common/variant.h"
#include "backend/datamodel/key.h"
#include "backend/datamodel/key_range.h"
#include "backend/datamodel/value.h"
#include "backend/locking/request.h"
#include "backend/schema/catalog/column.h"
#include "backend/schema/catalog/index.h"
#include "backend/schema/catalog/table.h"
#include "backend/storage/in_memory_iterator.h"
#include "backend/storage/iterator.h"
#include "backend/transaction/commit_timestamp.h"
#include "common/errors.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace backend {
namespace {
bool SortByKey(const FixedRowStorageIterator::Row& a,
const FixedRowStorageIterator::Row& b) {
return a.first < b.first;
}
void ResetInvalidValuesToNull(absl::Span<const Column* const> columns,
ValueList* values) {
if (!values) {
return;
}
ABSL_DCHECK(columns.size() == values->size());
for (int i = 0; i < columns.size(); ++i) {
if (!values->at(i).is_valid()) {
values->at(i) = zetasql::values::Null(columns[i]->GetType());
}
}
}
} // namespace
absl::Status TransactionStore::AcquireReadLock(
const Table* table, const KeyRange& key_range,
absl::Span<const Column* const> columns) const {
lock_handle_->EnqueueLock(LockRequest(LockMode::kShared, table->id(),
key_range, GetColumnIDs(columns)));
return lock_handle_->Wait();
}
absl::Status TransactionStore::AcquireWriteLock(
const Table* table, const KeyRange& key_range,
absl::Span<const Column* const> columns) const {
lock_handle_->EnqueueLock(LockRequest(LockMode::kExclusive, table->id(),
key_range, GetColumnIDs(columns)));
return lock_handle_->Wait();
}
absl::Status TransactionStore::BufferInsert(
const Table* table, const Key& key, absl::Span<const Column* const> columns,
const ValueList& values) {
// Acquire locks to prevent another transaction to modify this entity.
ZETASQL_RETURN_IF_ERROR(AcquireWriteLock(table, KeyRange::Point(key), columns));
RowOp row_op;
bool row_exists = RowExistsInBuffer(table, key, &row_op);
Row row_values;
if (row_exists) {
// There is an existing delete on this row. Normalize this insert
// with the previous delete mutation.
row_values = row_op.second;
}
// Buffer the insert mutation with the row values to be inserted.
for (int i = 0; i < columns.size(); ++i) {
row_values[columns[i]] = values[i];
}
buffered_ops_[table][key] = std::make_pair(OpType::kInsert, row_values);
return absl::OkStatus();
}
absl::Status TransactionStore::BufferUpdate(
const Table* table, const Key& key, absl::Span<const Column* const> columns,
const ValueList& values) {
// Acquire locks to prevent another transaction to modify this entity.
ZETASQL_RETURN_IF_ERROR(AcquireWriteLock(table, KeyRange::Point(key), columns));
RowOp row_op;
bool row_exists = RowExistsInBuffer(table, key, &row_op);
// Buffer the update mutation with the cell values to be updated.
OpType op_type;
Row row_values;
if (row_exists) {
// There is an existing insert or update on this row. Normalize this update
// with the previous mutation.
row_values = row_op.second;
// If the previous mutation on this row is an insert, keep the OpType
// as insert, since updating an inserted row will appear as a single insert.
op_type = row_op.first;
} else {
op_type = OpType::kUpdate;
}
for (int i = 0; i < columns.size(); ++i) {
row_values[columns[i]] = values[i];
}
buffered_ops_[table][key] = std::make_pair(op_type, row_values);
return absl::OkStatus();
}
absl::Status TransactionStore::BufferDelete(const Table* table,
const Key& key) {
// Acquire locks to prevent another transaction to modify this entity.
ZETASQL_RETURN_IF_ERROR(AcquireWriteLock(table, KeyRange::Point(key), {}));
RowOp row_op;
// This canonicalization is required by change streams. If there's an INSERT
// prior to this DELETE to the same row in the same transaction, the two
// operation should collapse. However, if the row already exists in storage,
// the DELETE shouldn't be collapsed (e.g. DELETE INSERT DELETE to an existing
// row).
if (RowExistsInBuffer(table, key, &row_op) &&
!RowExistsInStorage(table, key)) {
buffered_ops_[table].erase(key);
} else {
// Marking all columns null to indicate a delete.
Row row_values;
for (auto column : table->columns()) {
row_values[column] = zetasql::values::Null(column->GetType());
}
buffered_ops_[table][key] = std::make_pair(OpType::kDelete, row_values);
}
return absl::OkStatus();
}
absl::Status TransactionStore::BufferWriteOp(const WriteOp& op) {
return std::visit(
overloaded{
[&](const InsertOp& op) {
return BufferInsert(op.table, op.key, op.columns, op.values);
},
[&](const UpdateOp& op) {
return BufferUpdate(op.table, op.key, op.columns, op.values);
},
[&](const DeleteOp& op) { return BufferDelete(op.table, op.key); },
},
op);
}
absl::StatusOr<ValueList> TransactionStore::ReadCommitted(
const Table* table, const Key& key,
std::vector<const Column*> columns) const {
ZETASQL_RETURN_IF_ERROR(AcquireReadLock(table, KeyRange::Point(key), columns));
std::unique_ptr<StorageIterator> base_itr;
ZETASQL_RETURN_IF_ERROR(base_storage_->Read(absl::InfiniteFuture(), table->id(),
KeyRange::Point(key),
GetColumnIDs(columns), &base_itr));
ValueList values;
while (base_itr->Next()) {
for (int i = 0; i < base_itr->NumColumns(); i++) {
if (base_itr->ColumnValue(i).is_valid()) {
values.push_back(base_itr->ColumnValue(i));
} else {
values.push_back(zetasql::values::Null(columns[i]->GetType()));
}
}
}
return values;
}
// Reads keys within the given range and add to StorageIterator. This is done by
// reading from both the base storage & transaction store.
// - Read keys from base storage (and apply merges as mentioned below).
// - Read keys from transaction store and perform the following action
// for each of OpType as:
// - insert: add to output storage iterator
// - update: these updates should be applied over the base storage row.
// - delete: should remove the key read from the base storage.
absl::Status TransactionStore::Read(
const Table* table, const KeyRange& key_range,
absl::Span<const Column* const> columns,
std::unique_ptr<StorageIterator>* storage_itr,
bool allow_pending_commit_timestamps_in_read) const {
// Acquire locks to prevent another transaction to modify this entity.
ZETASQL_RETURN_IF_ERROR(AcquireReadLock(table, key_range, columns));
// Read rows buffered within transaction store.
// Table lookup.
std::vector<FixedRowStorageIterator::Row> rows;
auto table_itr = buffered_ops_.find(table);
if (table_itr != buffered_ops_.end()) {
auto table = table_itr->second;
// Key range lookup.
auto begin_itr = table.lower_bound(key_range.start_key());
auto end_itr = table.lower_bound(key_range.limit_key());
for (auto itr = begin_itr; itr != end_itr; ++itr) {
if (itr->second.first == OpType::kInsert) {
// Add inserts into the StorageIterator.
const Row& row_values = itr->second.second;
ValueList values;
values.reserve(columns.size());
for (const Column* column : columns) {
if (row_values.find(column) == row_values.end()) {
values.emplace_back(zetasql::values::Null(column->GetType()));
} else {
values.emplace_back(row_values.at(column));
}
}
rows.emplace_back(itr->first, std::move(values));
}
}
}
auto buffered_rows_count = rows.size();
// Read from the base storage and apply the changes buffered in transaction
// store.
std::unique_ptr<StorageIterator> base_itr;
ZETASQL_RETURN_IF_ERROR(base_storage_->Read(absl::InfiniteFuture(), table->id(),
key_range, GetColumnIDs(columns),
&base_itr));
while (base_itr->Next()) {
ValueList values;
values.reserve(columns.size());
RowOp row_op;
// Merge column values from transaction store & base storage.
if (RowExistsInBuffer(table, base_itr->Key(), &row_op)) {
if (row_op.first == OpType::kDelete || row_op.first == OpType::kInsert) {
// Omit the deletes from the output. The buffered inserts have already
// been added to the output.
continue;
}
if (row_op.first == OpType::kUpdate) {
// Update the column values to reflect the changes in transaction store.
for (int i = 0; i < columns.size(); i++) {
if (row_op.second.find(columns[i]) != row_op.second.end()) {
values.emplace_back(row_op.second[columns[i]]);
} else if (base_itr->ColumnValue(i).is_valid()) {
values.emplace_back(base_itr->ColumnValue(i));
} else {
values.emplace_back(zetasql::values::Null(columns[i]->GetType()));
}
}
}
} else {
// Copy the base storage column values since this row does not exists in
// transaction store.
for (int i = 0; i < columns.size(); i++) {
if (base_itr->ColumnValue(i).is_valid()) {
values.emplace_back(base_itr->ColumnValue(i));
} else {
values.emplace_back(zetasql::values::Null(columns[i]->GetType()));
}
}
}
rows.emplace_back(base_itr->Key(), std::move(values));
}
// Pending commit timestamp values in buffer cannot be returned to
// clients.
if (!allow_pending_commit_timestamps_in_read) {
ZETASQL_RETURN_IF_ERROR(commit_timestamp_tracker_->CheckRead(table, columns));
}
// The keys need to be sorted to provide iterating in order. The rows added
// from buffered ops and the ones added from base storage were already in
// sorted order respectively. Merge these two sorted subarrays to get overall
// sorted order.
ABSL_DCHECK(buffered_rows_count <= rows.size())
<< buffered_rows_count << " vs " << rows.size();
auto first_base_row_iter = rows.begin() + buffered_rows_count;
std::inplace_merge(rows.begin(), first_base_row_iter, rows.end(), SortByKey);
*storage_itr = std::make_unique<FixedRowStorageIterator>(std::move(rows));
return absl::OkStatus();
}
bool TransactionStore::RowExistsInStorage(const Table* table, const Key& key) {
absl::Status row_in_base_storage =
base_storage_->Lookup(absl::InfiniteFuture(), table->id(), key, {}, {});
return row_in_base_storage.code() != absl::StatusCode::kNotFound;
}
bool TransactionStore::RowExistsInBuffer(const Table* table, const Key& key,
RowOp* row_op) const {
const auto table_itr = buffered_ops_.find(table);
if (table_itr == buffered_ops_.end()) {
// Table does not exist. This can happen if the table is empty.
return false;
}
const auto row_op_itr = table_itr->second.find(key);
if (row_op_itr == table_itr->second.end()) {
// Key does not exist.
return false;
}
*row_op = row_op_itr->second;
return true;
}
absl::StatusOr<ValueList> TransactionStore::Lookup(
const Table* table, const Key& key,
absl::Span<const Column* const> columns) const {
ValueList values;
// Acquire locks to prevent another transaction to modify this entity.
ZETASQL_RETURN_IF_ERROR(AcquireReadLock(table, KeyRange::Point(key), columns));
// Check if row exists within the buffer.
RowOp row_op;
if (RowExistsInBuffer(table, key, &row_op)) {
switch (row_op.first) {
case OpType::kInsert: {
// Fetch the latest value from the cell.
for (auto column : columns) {
auto row_value = row_op.second.find(column);
if (row_value != row_op.second.end()) {
values.emplace_back(row_value->second);
} else {
values.emplace_back(zetasql::values::Null(column->GetType()));
}
}
break;
}
case OpType::kUpdate: {
// For update, the base storage needs to be checked to retrieve values
// which might not be included in the update.
ZETASQL_RETURN_IF_ERROR(base_storage_->Lookup(absl::InfiniteFuture(),
table->id(), key,
GetColumnIDs(columns), &values));
ResetInvalidValuesToNull(columns, &values);
for (int i = 0; i < columns.size(); ++i) {
// Update values retrieved from base storage with new values.
auto row_value = row_op.second.find(columns[i]);
if (row_value != row_op.second.end()) {
values[i] = row_value->second;
}
}
break;
}
// Ignore delete operations.
case OpType::kDelete: {
return error::RowNotFound(table->id(), key.DebugString());
break;
}
}
return values;
}
ZETASQL_RETURN_IF_ERROR(base_storage_->Lookup(absl::InfiniteFuture(), table->id(),
key, GetColumnIDs(columns), &values));
ResetInvalidValuesToNull(columns, &values);
return values;
}
std::vector<WriteOp> TransactionStore::GetBufferedOps() const {
std::vector<WriteOp> buffered_ops;
for (const auto& entry : buffered_ops_) {
const Table* table = entry.first;
for (const auto& row : entry.second) {
const Key& key = row.first;
const RowOp& row_op = row.second;
std::vector<const Column*> columns;
ValueList values;
for (const auto& cell : row_op.second) {
columns.emplace_back(cell.first);
values.emplace_back(cell.second);
}
switch (row_op.first) {
case OpType::kInsert: {
buffered_ops.emplace_back(InsertOp{table, key, columns, values});
break;
}
case OpType::kUpdate: {
buffered_ops.emplace_back(UpdateOp{table, key, columns, values});
break;
}
case OpType::kDelete: {
buffered_ops.emplace_back(DeleteOp{table, key});
break;
}
}
}
}
return buffered_ops;
}
} // namespace backend
} // namespace emulator
} // namespace spanner
} // namespace google